Created
February 20, 2018 13:43
-
-
Save vicmargar/c28d59dda98602447aac5b17efc16774 to your computer and use it in GitHub Desktop.
GenStage: ProducerConsumer vs Consumer Supervisor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require Logger | |
defmodule P do | |
use GenStage | |
@doc "Starts the broadcaster." | |
def start_link() do | |
GenStage.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
@doc "Sends an event and returns only after the event is dispatched." | |
def sync_notify(event, timeout \\ 5000) do | |
GenStage.call(__MODULE__, {:notify, event}, timeout) | |
end | |
## Callbacks | |
def init(:ok) do | |
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher} | |
end | |
def handle_call({:notify, event}, from, {queue, pending_demand}) do | |
queue = :queue.in({from, event}, queue) | |
dispatch_events(queue, pending_demand, []) | |
end | |
def handle_demand(incoming_demand, {queue, pending_demand}) do | |
dispatch_events(queue, incoming_demand + pending_demand, []) | |
end | |
defp dispatch_events(queue, 0, events) do | |
{:noreply, Enum.reverse(events), {queue, 0}} | |
end | |
defp dispatch_events(queue, demand, events) do | |
case :queue.out(queue) do | |
{{:value, {from, event}}, queue} -> | |
GenStage.reply(from, :ok) | |
dispatch_events(queue, demand - 1, [event | events]) | |
{:empty, queue} -> | |
{:noreply, Enum.reverse(events), {queue, demand}} | |
end | |
end | |
end | |
defmodule SimpleClient do | |
def process_event(event) do | |
Task.start_link(fn -> | |
process(event) | |
Process.sleep(1000) | |
end) | |
end | |
def process({event, :q}) do | |
Logger.info "#{Time.utc_now()} Processing events in Q: #{inspect(event)}" | |
end | |
def process({event, :av}) do | |
Logger.info "#{Time.utc_now()} Processing events in AV: #{inspect(event)}" | |
end | |
end | |
defmodule Client do | |
use GenStage | |
import SimpleClient | |
def start_link() do | |
GenStage.start_link(__MODULE__, :ok) | |
end | |
def init(:ok) do | |
{:consumer, :ok} | |
end | |
def process_events([]), do: [] | |
def process_events([event | events]) do | |
process(event) | |
Process.sleep(1000) | |
process_events(events) | |
end | |
def handle_events(events, _from, state) do | |
process_events(events) | |
# We are a consumer, so we would never emit items. | |
{:noreply, [], state} | |
end | |
end | |
defmodule PC do | |
use GenStage | |
def start_link(provider) do | |
GenStage.start_link(__MODULE__, provider) | |
end | |
def init(provider) do | |
{:producer_consumer, provider, subscribe_to: [{P, selector: fn {_event, p} -> p == provider end}]} | |
end | |
def handle_events(events, _from, provider) do | |
{:noreply, events, provider} | |
end | |
end | |
defmodule PCS do | |
use ConsumerSupervisor | |
def start_link(provider) do | |
ConsumerSupervisor.start_link(__MODULE__, provider) | |
end | |
def init(provider: provider, max_demand: max_demand) do | |
children = [%{id: SimpleClient, restart: :transient, start: {SimpleClient, :process_event, []}}] | |
opts = [strategy: :one_for_one, subscribe_to: [{P, max_demand: max_demand, selector: fn {_event, p} -> p == provider end}]] | |
ConsumerSupervisor.init(children, opts) | |
end | |
end | |
defmodule GenStageTest do | |
def test_pc do | |
# producer | |
{:ok, _p} = P.start_link() | |
# producer consumers | |
{:ok, qpc} = PC.start_link(:q) | |
{:ok, avpc} = PC.start_link(:av) | |
# consumers | |
{:ok, q1} = Client.start_link() | |
{:ok, av1} = Client.start_link() | |
{:ok, av2} = Client.start_link() | |
GenStage.sync_subscribe(q1, to: qpc) | |
GenStage.sync_subscribe(av1, to: avpc) | |
GenStage.sync_subscribe(av2, to: avpc) | |
P.sync_notify({"hello 1", :q}) | |
P.sync_notify({"hello 2", :q}) | |
P.sync_notify({"hello 3", :q}) | |
P.sync_notify({"hello 4", :q}) | |
P.sync_notify({"hello 5", :av}) | |
P.sync_notify({"hello 6", :av}) | |
P.sync_notify({"hello 7", :av}) | |
P.sync_notify({"hello 8", :av}) | |
end | |
def test_pcs do | |
# producer | |
{:ok, _p} = P.start_link() | |
# consumer supervisors | |
{:ok, _qpc} = PCS.start_link(provider: :q, max_demand: 1) | |
{:ok, _avpc} = PCS.start_link(provider: :av, max_demand: 2) | |
P.sync_notify({"hello 1", :q}) | |
P.sync_notify({"hello 2", :q}) | |
P.sync_notify({"hello 3", :q}) | |
P.sync_notify({"hello 4", :q}) | |
P.sync_notify({"hello 5", :av}) | |
P.sync_notify({"hello 6", :av}) | |
P.sync_notify({"hello 7", :av}) | |
P.sync_notify({"hello 8", :av}) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output:
The producer-consumer version works as expected with 3 concurrent processes (1 for :q, and 2 for :av)
The consumer supervisor version doesn't work as I'd expect.