Skip to content

Instantly share code, notes, and snippets.

@andrefreitas
Created November 22, 2018 09:36
Show Gist options
  • Save andrefreitas/64bd049fed7513ad930c683cc7c57f3f to your computer and use it in GitHub Desktop.
Save andrefreitas/64bd049fed7513ad930c683cc7c57f3f to your computer and use it in GitHub Desktop.
Example of using Genstage
defmodule EventsProducer do
use GenStage
def init(min) do
{:producer, min}
end
def handle_demand(demand, min) when demand > 0 do
events = Enum.to_list(1..min+demand)
IO.puts("---> Demand #{demand} events")
IO.puts("-> Dispatched #{length(events)} events")
{:noreply, events, min}
end
end
defmodule EventsConsumer do
use GenStage
def init(_args) do
{:consumer, []}
end
def handle_events(events, _from, state) do
:timer.sleep(1000)
IO.puts("Consumed #{length(events)} events")
{:noreply, [], state}
end
end
{:ok, producer} = GenStage.start_link(EventsProducer, 100)
{:ok, consumer} = GenStage.start_link(EventsConsumer, [])
# min_demand -> minimum events already dispatched by the producer. Will ask for
# more when they will be bellow this value!
# max_demand -> max value that will ask the producer
GenStage.sync_subscribe(consumer, to: producer, min_demand: 50, max_demand: 100)
Process.sleep(:infinity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment