Created
May 4, 2017 16:27
-
-
Save benjamintanweihao/196f7bafd0040dd7a788e94cdad39f51 to your computer and use it in GitHub Desktop.
Working example of 1-N Producer/Consumers, where each consumer receives the same thing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Counter do | |
@moduledoc """ | |
This is a simple producer that counts from the given | |
number whenever there is a demand. | |
""" | |
use GenStage | |
def start_link(initial) when is_integer(initial) do | |
GenStage.start_link(__MODULE__, initial, name: __MODULE__) | |
end | |
## Callbacks | |
def init(initial) do | |
# TODO: What happens when we play around with the dispatcher? | |
# TODO: What happens when we play around with the demand? | |
{:producer, initial, dispatcher: GenStage.BroadcastDispatcher, demand: :accumulate} | |
end | |
def handle_demand(demand, counter) when demand > 0 do | |
# If the counter is 3 and we ask for 2 items, we will | |
# emit the items 3 and 4, and set the state to 5. | |
events = Enum.to_list(counter..counter+demand-1) | |
Process.sleep(1000) | |
{:noreply, events, counter + 1} | |
end | |
end | |
defmodule ConsumerA do | |
@moduledoc """ | |
A consumer will be a consumer supervisor that will | |
spawn printer tasks for each event. | |
""" | |
use ConsumerSupervisor | |
def start_link() do | |
ConsumerSupervisor.start_link(__MODULE__, :ok) | |
end | |
# Callbacks | |
def init(:ok) do | |
children = [ | |
worker(PrinterA, [], restart: :temporary) | |
] | |
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]} | |
end | |
end | |
defmodule ConsumerB do | |
@moduledoc """ | |
A consumer will be a consumer supervisor that will | |
spawn printer tasks for each event. | |
""" | |
use ConsumerSupervisor | |
def start_link() do | |
ConsumerSupervisor.start_link(__MODULE__, :ok) | |
end | |
# Callbacks | |
def init(:ok) do | |
children = [ | |
worker(PrinterB, [], restart: :temporary) | |
] | |
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]} | |
end | |
end | |
defmodule ConsumerC do | |
@moduledoc """ | |
A consumer will be a consumer supervisor that will | |
spawn printer tasks for each event. | |
""" | |
use ConsumerSupervisor | |
def start_link() do | |
ConsumerSupervisor.start_link(__MODULE__, :ok) | |
end | |
# Callbacks | |
def init(:ok) do | |
children = [ | |
worker(PrinterC, [], restart: :temporary) | |
] | |
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]} | |
end | |
end | |
defmodule ConsumerD do | |
@moduledoc """ | |
A consumer will be a consumer supervisor that will | |
spawn printer tasks for each event. | |
""" | |
use ConsumerSupervisor | |
def start_link() do | |
ConsumerSupervisor.start_link(__MODULE__, :ok) | |
end | |
# Callbacks | |
def init(:ok) do | |
children = [ | |
worker(PrinterD, [], restart: :temporary) | |
] | |
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]} | |
end | |
end | |
defmodule PrinterA do | |
def start_link(event) do | |
Task.start_link(fn -> | |
IO.puts "A: #{inspect {self(), event}}" | |
end) | |
end | |
end | |
defmodule PrinterB do | |
def start_link(event) do | |
Task.start_link(fn -> | |
IO.puts "B: #{inspect {self(), event}}" | |
end) | |
end | |
end | |
defmodule PrinterC do | |
def start_link(event) do | |
Task.start_link(fn -> | |
IO.puts "C: #{inspect {self(), event}}" | |
end) | |
end | |
end | |
defmodule PrinterD do | |
def start_link(event) do | |
Task.start_link(fn -> | |
IO.puts "D: #{inspect {self(), event}}" | |
end) | |
end | |
end | |
defmodule App do | |
@moduledoc """ | |
Your application entry-point. | |
For actual applications, start/0 should be start/2. | |
""" | |
def start do | |
{:ok, c} = Counter.start_link(0) | |
ConsumerA.start_link() | |
ConsumerB.start_link() | |
ConsumerC.start_link() | |
ConsumerD.start_link() | |
GenStage.demand(c, :forward) | |
end | |
end | |
# Start the app and wait forever | |
App.start | |
Process.sleep(:infinity) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment