Skip to content

Instantly share code, notes, and snippets.

@comboy
Created June 2, 2018 21:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save comboy/0989a4b29a054b53210d724c87b0ca24 to your computer and use it in GitHub Desktop.
Save comboy/0989a4b29a054b53210d724c87b0ca24 to your computer and use it in GitHub Desktop.
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
IO.puts "DEMAND: #{demand}"
# E.g. producer is querying some internet resource for new items, sometimes there are no new items
events = case counter do
0 -> [:a, :b, :c]
1 -> []
2 -> [:d, :e]
_ -> [:oh]
end
{:noreply, events, counter + 1}
end
end
defmodule C do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
:timer.sleep(200)
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
{:ok, a} = GenStage.start_link(A, 0)
{:ok, c} = GenStage.start_link(C, :ok)
GenStage.sync_subscribe(c, to: a, max_demand: 1)
Process.sleep(:infinity)
# Output:
#
# ❯ mix run play.exs
# DEMAND: 1
# [:a]
# [:b]
# [:c]
# DEMAND: 1
# hangs there
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment