Skip to content

Instantly share code, notes, and snippets.

@campezzi
Last active January 17, 2019 01:54
Show Gist options
  • Save campezzi/b8e71e98b1efa2ba1dc99e2178b0eda6 to your computer and use it in GitHub Desktop.
Save campezzi/b8e71e98b1efa2ba1dc99e2178b0eda6 to your computer and use it in GitHub Desktop.
Elixir - Using GenStage's ConsumerSupervisor to have a dynamic pool of consumer processes
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
Supervisor.Spec.worker(Producer, []),
Supervisor.Spec.worker(Consumer, [], id: 1)
]
opts = [strategy: :one_for_one, name: Stages.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule Producer do
use GenStage
@words String.split("lorem ipsum dolor sit amet", " ")
def start_link(_) do
GenStage.start_link(__MODULE__, @words, name: __MODULE__)
end
## Callbacks
def init(words) do
{:producer, words}
end
def handle_demand(demand, words) when demand > 0 do
IO.puts("Received demand: #{demand}")
events =
words
|> Stream.cycle()
|> Enum.take(demand)
{:noreply, events, words}
end
end
defmodule Consumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(Printer, [], restart: :temporary)
]
{:ok, children,
strategy: :one_for_one, subscribe_to: [{Producer, min_demand: 10, max_demand: 50}]}
end
end
defmodule Printer do
def start_link(event) do
Task.start_link(fn ->
[interval] = Enum.take_random(1..5, 1)
Process.sleep(interval * 1000)
IO.inspect({self(), event})
end)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment