Skip to content

Instantly share code, notes, and snippets.

@benjamintanweihao
Created May 4, 2017 16:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benjamintanweihao/196f7bafd0040dd7a788e94cdad39f51 to your computer and use it in GitHub Desktop.
Save benjamintanweihao/196f7bafd0040dd7a788e94cdad39f51 to your computer and use it in GitHub Desktop.
Working example of 1-N Producer/Consumers, where each consumer receives the same thing.
defmodule Counter do
@moduledoc """
This is a simple producer that counts from the given
number whenever there is a demand.
"""
use GenStage
def start_link(initial) when is_integer(initial) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
## Callbacks
def init(initial) do
# TODO: What happens when we play around with the dispatcher?
# TODO: What happens when we play around with the demand?
{:producer, initial, dispatcher: GenStage.BroadcastDispatcher, demand: :accumulate}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
Process.sleep(1000)
{:noreply, events, counter + 1}
end
end
defmodule ConsumerA do
@moduledoc """
A consumer will be a consumer supervisor that will
spawn printer tasks for each event.
"""
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(PrinterA, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]}
end
end
defmodule ConsumerB do
@moduledoc """
A consumer will be a consumer supervisor that will
spawn printer tasks for each event.
"""
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(PrinterB, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]}
end
end
defmodule ConsumerC do
@moduledoc """
A consumer will be a consumer supervisor that will
spawn printer tasks for each event.
"""
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(PrinterC, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]}
end
end
defmodule ConsumerD do
@moduledoc """
A consumer will be a consumer supervisor that will
spawn printer tasks for each event.
"""
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(PrinterD, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 1}]}
end
end
defmodule PrinterA do
def start_link(event) do
Task.start_link(fn ->
IO.puts "A: #{inspect {self(), event}}"
end)
end
end
defmodule PrinterB do
def start_link(event) do
Task.start_link(fn ->
IO.puts "B: #{inspect {self(), event}}"
end)
end
end
defmodule PrinterC do
def start_link(event) do
Task.start_link(fn ->
IO.puts "C: #{inspect {self(), event}}"
end)
end
end
defmodule PrinterD do
def start_link(event) do
Task.start_link(fn ->
IO.puts "D: #{inspect {self(), event}}"
end)
end
end
defmodule App do
@moduledoc """
Your application entry-point.
For actual applications, start/0 should be start/2.
"""
def start do
{:ok, c} = Counter.start_link(0)
ConsumerA.start_link()
ConsumerB.start_link()
ConsumerC.start_link()
ConsumerD.start_link()
GenStage.demand(c, :forward)
end
end
# Start the app and wait forever
App.start
Process.sleep(:infinity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment