Skip to content

Instantly share code, notes, and snippets.

@zampino
Created May 11, 2017 09:57
Show Gist options
  • Save zampino/d62df703360bc244fe4449109246f371 to your computer and use it in GitHub Desktop.
Save zampino/d62df703360bc244fe4449109246f371 to your computer and use it in GitHub Desktop.
emitter / compressor example
defmodule GenstagePlayground do
defmodule Emitter do
use GenStage
require Logger
defstruct [counter: 0, demand: 0, buffer: []]
def add(), do: GenStage.call(__MODULE__, :in)
def start_link(), do: GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
def init(:ok) do
{:producer, %__MODULE__{}}
end
def handle_demand(num, state) do
taken = Enum.take(state.buffer, num)
new_buf = Enum.slice(state.buffer, num+1, Enum.count(state.buffer))
new_dem = state.demand + num - Enum.count(taken)
Logger.debug "DEMANDING #{num}"
{:noreply, taken, %__MODULE__{state | buffer: new_buf, demand: new_dem}}
end
def handle_call(:in, _from, %__MODULE__{demand: demand} = state)
when demand > 0 do
{:reply, :ok, [0], %__MODULE__{state | demand: demand - 1}}
end
def handle_call(:in, _from, %__MODULE__{demand: demand, buffer: buffer} = state)
when demand == 0 do
{:reply, :ok, [], %__MODULE__{state | buffer: buffer ++ [0]}}
end
end
defmodule Compressor do
use GenStage
require Logger
defstruct [id: nil]
def init(id) do
{:consumer, %__MODULE__{id: id}}
end
def handle_events(ents, _from, state) do
Logger.info "COMPRESSING WITH [#{state.id}] -> #{Enum.count(ents)} EVENTS"
:timer.sleep 1_000
{:noreply, [], state}
end
end
def boot do
{:ok, _pid} = Emitter.start_link
{:ok, pid_1} = GenStage.start_link Compressor, 1
{:ok, pid_2} = GenStage.start_link Compressor, 2
GenStage.sync_subscribe(pid_1, to: Emitter, max_demand: 10, min_demand: 1)
GenStage.sync_subscribe(pid_2, to: Emitter, max_demand: 10, min_demand: 1)
alias GenstagePlayground.Emitter, as: E
IO.puts "booting"
Stream.unfold( 2_000, fn int ->
:timer.sleep(if int > 100, do: int, else: 100)
IO.puts "adding"
E.add()
{int, int - 100}
end) |> Stream.run()
end
end
GenstagePlayground.boot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment