Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Replacing GenEvent with DynamicSupervisor + GenServer
# file: lib/em_demo/some_handler.ex
#
defmodule EmDemo.AnotherHandler do
use GenServer
@impl true
def init([to_pid, token] = arg) do
IO.puts("#{__MODULE__}: init(#{inspect(arg)})")
{:ok, {to_pid, token}}
end
@impl true
def handle_cast({:another_event, data}, {pid, token} = state) do
IO.puts("#{__MODULE__} #{inspect(self())}: handle_cast({:another_event, #{inspect(data)}})")
Kernel.send(pid, token)
{:noreply, state}
end
def handle_cast(_dont_care, {pid, token} = state) do
IO.puts("#{__MODULE__} #{inspect(self())}: handle_cast(_dont_care)")
Kernel.send(pid, token)
{:noreply, state}
end
@impl true
def terminate(reason, state) do
IO.puts("#{__MODULE__} #{inspect(self())}: terminate(#{inspect(reason)}, #{inspect(state)})")
end
end
# file: lib/em_demo/application.ex
#
# Replacing GenEvent with DynamicSupervisor + GenServer
#
# Essentially an update to
#
# Replacing GenEvent by a Supervisor + GenServer
# http://blog.plataformatec.com.br/2016/11/replacing-genevent-by-a-supervisor-genserver/
#
# using DynamicSupervisor
#
defmodule EmDemo.Application do
@moduledoc false
use Application
def start(_type, _args) do
children = [
{EmDemo.EventManager, []}
]
opts = [strategy: :one_for_one, name: EmDemo.Supervisor]
Supervisor.start_link(children, opts)
end
end
# file: lib/demo.ex
#
defmodule Demo do
def run do
# Find the PID for the EventManager
{:ok, manager_pid} = find_child(EmDemo.Supervisor, EmDemo.EventManager)
# Add the handler processes
token = :done
to_pid = self()
arg_list = [to_pid, token]
handlers = start_handlers([EmDemo.SomeHandler, EmDemo.AnotherHandler], manager_pid, arg_list)
# Generate :some_event and wait for handlers to process it
EmDemo.EventManager.some_event(manager_pid, true)
wait_tokens(token, handlers)
# Generate :another_event and wait for handlers to process it
EmDemo.EventManager.another_event(manager_pid, false)
wait_tokens(token, handlers)
# Remove handler processes
stop_handlers(handlers)
end
defp find_child(supervisor_name, child_id) do
has_id? = fn {id, _, _, _} ->
child_id == id
end
case Enum.find(Supervisor.which_children(supervisor_name), has_id?) do
{_, child_pid, _, _} ->
{:ok, child_pid}
_ ->
{:error, :not_found}
end
end
defp start_handlers(modules, manager_pid, arg_list),
do: List.foldl(modules, [], &start_handler(manager_pid, &1, arg_list, &2))
defp start_handler(manager_pid, handler_module, arg_list, other_pids) do
case EmDemo.EventManager.add_handler(manager_pid, handler_module, arg_list) do
{:ok, pid} ->
[pid | other_pids]
_ ->
IO.puts("Error starting handler process: #{inspect(handler_module)}")
other_pids
end
end
defp wait_tokens(_, []) do
:ok
end
defp wait_tokens(token, [_ | tail]) do
receive do
^token ->
wait_tokens(token, tail)
end
end
defp stop_handlers([]) do
:ok
end
defp stop_handlers([pid | rest]) do
stop_handler(pid)
stop_handlers(rest)
end
defp stop_handler(pid),
do: GenServer.stop(pid)
end
# file: lib/em_demo/event_manager.ex
#
defmodule EmDemo.EventManager do
use DynamicSupervisor
@timeout 30_000
#
# Note: Dynamic Supervisor replaces :simple_one_for_one strategy
# https://hexdocs.pm/elixir/DynamicSupervisor.html#module-migrating-from-supervisors-simple_one_for_one
#
# iex> EmDemo.EventManager.child_spec([:arg])
# %{
# id: EmDemo.EventManager,
# start: {EmDemo.EventManager, :start_link, [[:arg]]},
# type: :supervisor
# }
#
# management functions
#
def start_link(init_arg),
do: DynamicSupervisor.start_link(__MODULE__, init_arg)
@impl true
def init(_init_arg),
do: DynamicSupervisor.init(strategy: :one_for_one)
def stop(sup),
do: stop(sup, DynamicSupervisor.which_children(sup))
def add_handler(sup, handler, init_args \\ []),
do: DynamicSupervisor.start_child(sup, handler_spec(handler, init_args))
#
# Event functions
#
def some_event(sup, payload),
do: notify(sup, {:some_event, payload})
def another_event(sup, payload),
do: notify(sup, {:another_event, payload})
#
# Internal functions
#
defp handler_spec(handler_module, init_args) do
%{
id: GenServer,
start: {GenServer, :start_link, [handler_module, init_args]},
restart: :temporary,
shutdown: 5000,
type: :worker,
modules: [GenServer]
}
end
defp notify(sup, msg),
do: notify_children(msg, DynamicSupervisor.which_children(sup))
defp notify_children(_msg, []) do
:ok
end
defp notify_children(msg, [{_id, child, _type, _modules} | rest]) do
# Cast message to handler process
GenServer.cast(child, msg)
notify_children(msg, rest)
end
defp stop(sup, []) do
DynamicSupervisor.stop(sup)
end
defp stop(sup, [{_id, child, _type, _modules}, rest]) do
GenServer.stop(child, :normal, @timeout)
stop(sup, rest)
end
end
$ iex -S mix
Erlang/OTP 22 [erts-10.4.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Compiling 4 files (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.run()
Elixir.EmDemo.SomeHandler: init([#PID<0.160.0>, :done])
Elixir.EmDemo.AnotherHandler: init([#PID<0.160.0>, :done])
Elixir.EmDemo.SomeHandler #PID<0.162.0>: handle_cast({:some_event, true})
Elixir.EmDemo.AnotherHandler #PID<0.163.0>: handle_cast(_dont_care)
Elixir.EmDemo.SomeHandler #PID<0.162.0>: handle_cast(_dont_care)
Elixir.EmDemo.AnotherHandler #PID<0.163.0>: handle_cast({:another_event, false}) 
Elixir.EmDemo.AnotherHandler #PID<0.163.0>: terminate(:normal, {#PID<0.160.0>, :done})
Elixir.EmDemo.SomeHandler #PID<0.162.0>: terminate(:normal, {#PID<0.160.0>, :done})
:ok
iex(2)> 
# file: lib/em_demo/some_handler.ex
#
defmodule EmDemo.SomeHandler do
use GenServer
@impl true
def init([to_pid, token] = arg) do
IO.puts("#{__MODULE__}: init(#{inspect(arg)})")
{:ok, {to_pid, token}}
end
@impl true
def handle_cast({:some_event, data}, {pid, token} = state) do
IO.puts("#{__MODULE__} #{inspect(self())}: handle_cast({:some_event, #{inspect(data)}})")
Kernel.send(pid, token)
{:noreply, state}
end
def handle_cast(_dont_care, {pid, token} = state) do
IO.puts("#{__MODULE__} #{inspect(self())}: handle_cast(_dont_care)")
Kernel.send(pid, token)
{:noreply, state}
end
@impl true
def terminate(reason, state) do
IO.puts("#{__MODULE__} #{inspect(self())}: terminate(#{inspect(reason)}, #{inspect(state)})")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.