Skip to content

Instantly share code, notes, and snippets.

@ream88
Created August 31, 2018 09:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ream88/b9002cdac373616c419f9b5e107692dd to your computer and use it in GitHub Desktop.
Save ream88/b9002cdac373616c419f9b5e107692dd to your computer and use it in GitHub Desktop.
GenStage sync and async
events = 1..5
defmodule SyncOrAsync do
use GenStage
def start_link(mode) when mode == :async or mode == :sync do
GenStage.start_link(__MODULE__, mode, name: __MODULE__)
end
# GenStage API
def init(mode), do: {:producer_consumer, mode}
def handle_events(events, _from, state) do
case state do
:sync ->
{:noreply, events, state}
:async ->
async_op(events)
{:noreply, [], state}
end
end
def handle_info({:event, event, stop}, state) do
if stop do
parent = self()
spawn(fn -> GenStage.stop(parent) end)
end
{:noreply, [event], state}
end
def async_op(events) do
parent = self()
last = List.last(events)
Enum.each(events, fn event ->
IO.puts("Working on #{event}")
spawn(fn -> send(parent, {:event, event, last == event}) end)
end)
end
end
# Sync
{:ok, producer} = GenStage.from_enumerable(events)
{:ok, sync} = SyncOrAsync.start_link(:sync)
GenStage.sync_subscribe(sync, to: producer)
[{sync, cancel: :transient}]
|> GenStage.stream()
|> Enum.to_list()
|> IO.inspect()
# Async
{:ok, producer} = GenStage.from_enumerable(events)
{:ok, async} = SyncOrAsync.start_link(:async)
# Omiting `cancel: :transient` when subscribing prevents `SyncOrAsync` to handle
# any events, as it will be stopped before the async work is done.
GenStage.sync_subscribe(async, to: producer, cancel: :transient)
[{async, cancel: :transient}]
|> GenStage.stream()
|> Enum.to_list()
|> IO.inspect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment