Skip to content

Instantly share code, notes, and snippets.

@garthk
Created May 15, 2019 06:50
Show Gist options
  • Save garthk/e9106f419338f52c5ef8e8cc0fc2f3b7 to your computer and use it in GitHub Desktop.
Save garthk/e9106f419338f52c5ef8e8cc0fc2f3b7 to your computer and use it in GitHub Desktop.
Overkill With GenStage

Strikes me I'm just re-inventing poolboy, here.

defmodule Queue do
@moduledoc """
Producer at the beginning of a `GenStage` pipeline to adapt push workloads to the pull pattern
being used in the rest of the pipeline. More or less non-blocking as long as there's demand.
Call `push_sync/2` to push an event into the pipeline.
**WARNING:** if you have one consumer and it's working slowly, your worst case delay is as long
as it takes to work through `max_demand - min_demand` events.
"""
use GenStage
@event_deliver [:queue, :deliver]
@event_handle_call [:queue, :queue]
@event_handle_demand [:queue, :demand]
@event_init [:queue, :init]
@event_status [:queue, :status]
@typedoc "An event to queue for the next `GenStage`. Any `t:term/0` will do."
@type event :: term
# An entry in the queue.
@typedoc false
@type entry :: {event(), GenServer.from(), integer()}
# Our state.
@typedoc false
@type state :: {:queue.queue(entry), integer()}
@doc "Starts our process."
@spec start_link(GenServer.options()) :: GenServer.on_start()
def start_link(opts) do
GenStage.start_link(__MODULE__, :ok, [{:name, __MODULE__} | opts])
end
@doc """
Ask the queue to deliver an `t:event/0` to the next `GenStage` in the pipeline, and wait for it.
Returns `{:ok, ms}` when the event is sent to the next stage in response to its demand, where
`ms` is the number of milliseconds between the calls to `c:GenStage.handle_call/3` and
`c:GenStage.reply/2`.
"""
@spec push_sync(event(), integer()) :: {:ok, integer()}
def push_sync(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
# https://hexdocs.pm/gen_stage/0.14.1/GenStage.html#c:init/1
@doc false
@spec init(:ok) :: {:producer, {:queue.queue(entry), 0}, [GenStage.producer_option()]}
def init(:ok) do
:telemetry.execute(@event_init, %{})
{:producer, {:queue.new(), 0}, dispatcher: GenStage.DemandDispatcher}
end
# https://hexdocs.pm/gen_stage/0.14.1/GenStage.html#c:handle_call/3
@doc false
@spec handle_call({:notify, event()}, GenServer.from(), state) ::
{:noreply, [GenStage.event()], state}
def handle_call({:notify, event}, from, {queue, demand}) do
:telemetry.execute(@event_handle_call, %{events: 1})
queue = :queue.in({event, from, :os.system_time()}, queue)
dispatch_events({queue, demand}, [])
end
# https://hexdocs.pm/gen_stage/0.14.1/GenStage.html#c:handle_demand/2
@doc false
@spec handle_demand(integer(), state) :: {:noreply, [GenStage.event()], state}
def handle_demand(incoming_demand, {queue, demand}) do
:telemetry.execute(@event_handle_demand, %{events: incoming_demand})
dispatch_events({queue, demand + incoming_demand}, [])
end
@spec dispatch_events(state, [GenStage.event()]) :: {:noreply, [GenStage.event()], state}
defp dispatch_events(state, events)
defp dispatch_events({queue, demand = 0}, events) do
{:noreply, Enum.reverse(events), emitted_status(queue, demand)}
end
defp dispatch_events({queue, demand}, events) do
case :queue.out(queue) do
{{:value, {event, from, queued}}, queue} ->
us = div(:os.system_time() - queued, 1_000)
ms = us / 1000
GenStage.reply(from, {:ok, ms})
:telemetry.execute(@event_deliver, %{events: 1, latency_ms: ms})
dispatch_events({queue, demand}, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), emitted_status(queue, demand - length(events))}
end
end
@spec emitted_status(:queue.queue(entry), integer()) :: state
defp emitted_status(queue, demand) do
:telemetry.execute(@event_status, %{demand: demand, length: :queue.len(queue)})
{queue, demand}
end
@doc "Document our first arguments to `:telemetry.execute/2`."
@spec events() :: [[atom()]]
def events do
[
@event_init,
@event_handle_call,
@event_handle_demand,
@event_status,
@event_deliver
]
end
end
defmodule Supervisor do
use Supervisor
require Logger
def start_link(:ok) do
Logger.debug("#{__MODULE__}.start_link/1")
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
Logger.debug("#{__MODULE__}.init/1")
config = Config.effective()
senders = Sender.child_specs(config.sender_count, :ok)
Supervisor.init([Queue | senders], strategy: :one_for_one)
end
end
defmodule Sender do
@moduledoc false
use GenStage
require Logger
def start_link(opts) do
Logger.debug("#{__MODULE__}.start_link/1")
GenStage.start_link(__MODULE__, opts)
end
@doc "Returns child specs for a number of `children`, each with the same options."
@spec child_specs(integer(), :ok) :: [Supervisor.child_spec()]
def child_specs(children, :ok) do
for child <- 1..children do
Supervisor.child_spec({__MODULE__, :ok}, id: {__MODULE__, child})
end
end
def init(:ok) do
Logger.debug("#{__MODULE__}.init/1")
consumer_option = [min_demand: 1, max_demand: 2]
{:consumer, :ok, subscribe_to: [{Queue, consumer_option}]}
end
@spec handle_events(any(), any(), any()) :: {:noreply, [], any()}
def handle_events(events, _from, state) do
for event <- events do
:timer.sleep(100)
IO.inspect(event, label: "#{inspect(self())} #{__MODULE__}")
end
{:noreply, [], state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment