Skip to content

Instantly share code, notes, and snippets.

@enpedasi
Last active April 18, 2018 23:23
Show Gist options
  • Save enpedasi/12598a7edc77c0de47590623f6c84e0b to your computer and use it in GitHub Desktop.
Save enpedasi/12598a7edc77c0de47590623f6c84e0b to your computer and use it in GitHub Desktop.
Elixir GenStage test
# ussage:
# GenStageTest.run()
#
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(A, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
defmodule B do
use GenStage
def start_link(number) do
GenStage.start_link(B, number)
end
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = Enum.map(events, fn i -> Task.async(fn -> Process.sleep(3000);i * number end) end)
|> Enum.map(&Task.await(&1))
{:noreply, events, number}
end
end
defmodule C do
use GenStage
def start_link() do
GenStage.start_link(C, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
defmodule GenStageTest do
def run do
{:ok, a} = A.start_link(0) # starting from zero
{:ok, b} = B.start_link(2) # multiply by 2
{:ok, c} = C.start_link() # state does not matter
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment