Skip to content

Instantly share code, notes, and snippets.

@Trevoke
Last active March 18, 2018 23:22
Show Gist options
  • Save Trevoke/dfa6d886f12d85d7d1d54103d4a808b5 to your computer and use it in GitHub Desktop.
Save Trevoke/dfa6d886f12d85d7d1d54103d4a808b5 to your computer and use it in GitHub Desktop.
producer_consumer does not modify events
# Usage: mix run lib/producer_consumer.exs
#
# Hit Ctrl+C twice to stop it.
#
# This is a base example where a producer A emits items,
# which are amplified by a producer consumer B and printed
# by consumer C.
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
# events = Enum.take(events, 5)
IO.puts "\n\nA ------------------------------"
<> "\ndemand: #{demand}, counter: #{counter}"
# <> "\nevents: #{inspect(events, limit: :infinity)}"
<> "\n[#{List.first(events)} .. #{List.last(events)}], length: #{length(events)}"
{:noreply, events, counter + demand}
end
end
defmodule B do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
# If we receive [0, 1, 2], this will transform
# it into [0, 1, 2, 1, 2, 3, 2, 3, 4].
# new_events =
# for event <- events,
# entry <- event..event+number,
# do: entry
new_events = events
tab = " "
IO.puts "\n\n#{tab}B ------------------------------"
# <> "\n#{tab}number: #{number}"
# <> "\nevents: #{inspect(events, limit: :infinity)}"
<> "\n#{tab}[#{List.first(events)} .. #{List.last(events)}], length: #{length(events)}"
# <> "\nnew_events: #{inspect(new_events, limit: :infinity)}"
<> "\n#{tab}[#{List.first(new_events)} .. #{List.last(new_events)}], length: #{length(new_events)}"
{:noreply, new_events, number}
end
end
defmodule C do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
tab = " "
IO.puts "\n\n#{tab}#{tab}C 1------------------------------"
# Wait
:timer.sleep(5000) # 10000 is 10 seconds
IO.puts "\n\n#{tab}#{tab}C 2------------------------------"
<> "\n#{tab}#{tab}state: #{state}"
# <> "\nevents: #{inspect(events, limit: :infinity)}"
<> "\n#{tab}#{tab}[#{List.first(events)} .. #{List.last(events)}], length: #{length(events)}"
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
{:ok, a} = GenStage.start_link(A, 0) # starting from zero
{:ok, b} = GenStage.start_link(B, 2) # expand by 2
{:ok, c} = GenStage.start_link(C, :ok) # state does not matter
IO.puts " B 400-700 C 300-500"
GenStage.sync_subscribe(b, to: a, min_demand: 400, max_demand: 700)
GenStage.sync_subscribe(c, to: b, min_demand: 300, max_demand: 500) # max_demand cannot be more than 700
Process.sleep(:infinity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment