Skip to content

Instantly share code, notes, and snippets.

@Fenntasy
Created March 13, 2019 16:16
Show Gist options
  • Save Fenntasy/1e930da0f7b6c2055a660831d4406a96 to your computer and use it in GitHub Desktop.
Save Fenntasy/1e930da0f7b6c2055a660831d4406a96 to your computer and use it in GitHub Desktop.
defmodule MyProducer do
use GenStage
def start_link(list) do
GenStage.start_link(__MODULE__, list)
end
def init(list) do
{:producer, list}
end
def handle_info(:terminate, state) do
{:stop, :shutdown, state}
end
def handle_demand(demand, state) when demand > 0 do
{list, new_list} = Enum.split(state, demand)
if Enum.empty?(state) do
GenStage.async_info(self(), :terminate)
end
{:noreply, list, new_list}
end
end
defmodule MyFlow do
def run do
[
{MyProducer, 1..2}
# {MyProducer, 1..20}
]
|> Flow.from_specs(max_demand: 2)
|> Flow.map(&do_something/1)
|> Flow.run()
end
def do_something(number) do
number * 2
end
end
defmodule MyTest do
use ExUnit.Case
test "my_flow" do
assert :ok = MyFlow.run()
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment