Skip to content

Instantly share code, notes, and snippets.

@OleMchls
Created July 25, 2018 12:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save OleMchls/9360b8b51867876573c6ee0ed4330206 to your computer and use it in GitHub Desktop.
Save OleMchls/9360b8b51867876573c6ee0ed4330206 to your computer and use it in GitHub Desktop.
gen stage finite demand
defmodule A do
use GenStage
def start_link(items) do
GenStage.start_link(A, items, name: A)
end
def init(items) do
{:producer, Enum.to_list(1..items)}
end
def handle_demand(demand, items) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
# events = Enum.to_list(counter..(counter + demand - 1))
{events, state} = Enum.split(items, demand)
IO.inspect(demand, label: "handle_demand DEMAND")
IO.inspect(length(events), label: "handle_demand length(events)")
IO.inspect(length(state), label: "handle_demand length(state)")
{:noreply, events, state}
end
end
defmodule B do
use GenStage
def start_link() do
GenStage.start_link(B, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [A]}
end
def handle_events(events, _from, state) do
# Wait for a second.
# Process.sleep(1000)
# Inspect the events.
IO.inspect(events, label: "handle_events")
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
👌 genstage iex -S mix
Erlang/OTP 21 [erts-10.0.2] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> A.start_link 10
{:ok, #PID<0.169.0>}
iex(2)> B.start_link
{:ok, #PID<0.171.0>}
handle_demand DEMAND: 1000
handle_demand length(events): 10
handle_demand length(state): 0
handle_events: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
iex(3)>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment