Skip to content

Instantly share code, notes, and snippets.

@stavro
Last active April 20, 2017 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stavro/15308d54b41c6feab371ee09de198b55 to your computer and use it in GitHub Desktop.
Save stavro/15308d54b41c6feab371ee09de198b55 to your computer and use it in GitHub Desktop.

GenStage Error Handling

In this simple example, 10 events are given to a broadcaster. The consumer receives the first event (and only one event), errors out after 5 seconds, and then nothing else happens.

# 14:11:52.771 [debug] Elixir.Stager.Consumer attaching to Elixir.Stager.Broadcaster
# Interactive Elixir (1.4.0) - press Ctrl+C to exit (type h() ENTER for help)

for _ <- 1..10, do: Stager.Broadcaster.sync_notify(:dummy_event)      
# 14:12:30.722 [debug] Elixir.Stager.Consumer received events: [:dummy_event]
#=> [:ok, :ok, :ok, :ok, :ok, :ok, :ok, :ok, :ok, :ok]

# 14:12:35.727 [debug] Elixir.Stager.Consumer attaching to Elixir.Stager.Broadcaster

# 14:12:35.751 [error] GenServer #PID<0.146.0> terminating

** (MatchError) no match of right hand side value: 2
    (stager) lib/stager/consumer.ex:36: Stager.Consumer.handle_event/1
    (stager) lib/stager/consumer.ex:28: anonymous fn/2 in Stager.Consumer.handle_events/3
    (elixir) lib/enum.ex:1755: Enum."-reduce/3-lists^foldl/2-0-"/3
    (stager) lib/stager/consumer.ex:27: Stager.Consumer.handle_events/3
    (gen_stage) lib/gen_stage.ex:2408: GenStage.consumer_dispatch/7
    (gen_stage) lib/gen_stage.ex:1949: GenStage.handle_info/2
    (stdlib) gen_server.erl:615: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:681: :gen_server.handle_msg/5
Last message: {:"$gen_consumer", {#PID<0.145.0>, #Reference<0.0.6.207>}, [:dummy_event]}
State: []
defmodule Stager.Broadcaster do
@moduledoc false
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
defmodule Stager.Consumer do
@moduledoc """
The GenEvent handler implementation is a simple consumer.
"""
use GenStage
require Logger
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
# Callbacks
def init(opts) do
{[broadcaster: broadcaster], consumer_state} = Keyword.split(opts, [:broadcaster])
Logger.debug("#{__MODULE__} attaching to #{broadcaster}")
{:consumer, consumer_state, subscribe_to: [broadcaster]}
end
def handle_events(events, _from, state) do
Logger.debug("#{__MODULE__} received events: #{inspect events}")
for event <- events do
handle_event(event)
end
{:noreply, [], state}
end
def handle_event(event) do
# simulate work
:timer.sleep(5000)
# Cause an error
1 = 2
end
end
defmodule Stager.Application do
# See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
# Define workers and child supervisors to be supervised
children = [
worker(Stager.Broadcaster, []),
worker(Stager.Consumer, [[broadcaster: Stager.Broadcaster]]),
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Stager.Supervisor]
Supervisor.start_link(children, opts)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment