Strikes me I'm just re-inventing poolboy
, here.
Created
May 15, 2019 06:50
-
-
Save garthk/e9106f419338f52c5ef8e8cc0fc2f3b7 to your computer and use it in GitHub Desktop.
Overkill With GenStage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Queue do | |
@moduledoc """ | |
Producer at the beginning of a `GenStage` pipeline to adapt push workloads to the pull pattern | |
being used in the rest of the pipeline. More or less non-blocking as long as there's demand. | |
Call `push_sync/2` to push an event into the pipeline. | |
**WARNING:** if you have one consumer and it's working slowly, your worst case delay is as long | |
as it takes to work through `max_demand - min_demand` events. | |
""" | |
use GenStage | |
@event_deliver [:queue, :deliver] | |
@event_handle_call [:queue, :queue] | |
@event_handle_demand [:queue, :demand] | |
@event_init [:queue, :init] | |
@event_status [:queue, :status] | |
@typedoc "An event to queue for the next `GenStage`. Any `t:term/0` will do." | |
@type event :: term | |
# An entry in the queue. | |
@typedoc false | |
@type entry :: {event(), GenServer.from(), integer()} | |
# Our state. | |
@typedoc false | |
@type state :: {:queue.queue(entry), integer()} | |
@doc "Starts our process." | |
@spec start_link(GenServer.options()) :: GenServer.on_start() | |
def start_link(opts) do | |
GenStage.start_link(__MODULE__, :ok, [{:name, __MODULE__} | opts]) | |
end | |
@doc """ | |
Ask the queue to deliver an `t:event/0` to the next `GenStage` in the pipeline, and wait for it. | |
Returns `{:ok, ms}` when the event is sent to the next stage in response to its demand, where | |
`ms` is the number of milliseconds between the calls to `c:GenStage.handle_call/3` and | |
`c:GenStage.reply/2`. | |
""" | |
@spec push_sync(event(), integer()) :: {:ok, integer()} | |
def push_sync(event, timeout \\ 5000) do | |
GenStage.call(__MODULE__, {:notify, event}, timeout) | |
end | |
# https://hexdocs.pm/gen_stage/0.14.1/GenStage.html#c:init/1 | |
@doc false | |
@spec init(:ok) :: {:producer, {:queue.queue(entry), 0}, [GenStage.producer_option()]} | |
def init(:ok) do | |
:telemetry.execute(@event_init, %{}) | |
{:producer, {:queue.new(), 0}, dispatcher: GenStage.DemandDispatcher} | |
end | |
# https://hexdocs.pm/gen_stage/0.14.1/GenStage.html#c:handle_call/3 | |
@doc false | |
@spec handle_call({:notify, event()}, GenServer.from(), state) :: | |
{:noreply, [GenStage.event()], state} | |
def handle_call({:notify, event}, from, {queue, demand}) do | |
:telemetry.execute(@event_handle_call, %{events: 1}) | |
queue = :queue.in({event, from, :os.system_time()}, queue) | |
dispatch_events({queue, demand}, []) | |
end | |
# https://hexdocs.pm/gen_stage/0.14.1/GenStage.html#c:handle_demand/2 | |
@doc false | |
@spec handle_demand(integer(), state) :: {:noreply, [GenStage.event()], state} | |
def handle_demand(incoming_demand, {queue, demand}) do | |
:telemetry.execute(@event_handle_demand, %{events: incoming_demand}) | |
dispatch_events({queue, demand + incoming_demand}, []) | |
end | |
@spec dispatch_events(state, [GenStage.event()]) :: {:noreply, [GenStage.event()], state} | |
defp dispatch_events(state, events) | |
defp dispatch_events({queue, demand = 0}, events) do | |
{:noreply, Enum.reverse(events), emitted_status(queue, demand)} | |
end | |
defp dispatch_events({queue, demand}, events) do | |
case :queue.out(queue) do | |
{{:value, {event, from, queued}}, queue} -> | |
us = div(:os.system_time() - queued, 1_000) | |
ms = us / 1000 | |
GenStage.reply(from, {:ok, ms}) | |
:telemetry.execute(@event_deliver, %{events: 1, latency_ms: ms}) | |
dispatch_events({queue, demand}, [event | events]) | |
{:empty, queue} -> | |
{:noreply, Enum.reverse(events), emitted_status(queue, demand - length(events))} | |
end | |
end | |
@spec emitted_status(:queue.queue(entry), integer()) :: state | |
defp emitted_status(queue, demand) do | |
:telemetry.execute(@event_status, %{demand: demand, length: :queue.len(queue)}) | |
{queue, demand} | |
end | |
@doc "Document our first arguments to `:telemetry.execute/2`." | |
@spec events() :: [[atom()]] | |
def events do | |
[ | |
@event_init, | |
@event_handle_call, | |
@event_handle_demand, | |
@event_status, | |
@event_deliver | |
] | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Supervisor do | |
use Supervisor | |
require Logger | |
def start_link(:ok) do | |
Logger.debug("#{__MODULE__}.start_link/1") | |
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
def init(:ok) do | |
Logger.debug("#{__MODULE__}.init/1") | |
config = Config.effective() | |
senders = Sender.child_specs(config.sender_count, :ok) | |
Supervisor.init([Queue | senders], strategy: :one_for_one) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Sender do | |
@moduledoc false | |
use GenStage | |
require Logger | |
def start_link(opts) do | |
Logger.debug("#{__MODULE__}.start_link/1") | |
GenStage.start_link(__MODULE__, opts) | |
end | |
@doc "Returns child specs for a number of `children`, each with the same options." | |
@spec child_specs(integer(), :ok) :: [Supervisor.child_spec()] | |
def child_specs(children, :ok) do | |
for child <- 1..children do | |
Supervisor.child_spec({__MODULE__, :ok}, id: {__MODULE__, child}) | |
end | |
end | |
def init(:ok) do | |
Logger.debug("#{__MODULE__}.init/1") | |
consumer_option = [min_demand: 1, max_demand: 2] | |
{:consumer, :ok, subscribe_to: [{Queue, consumer_option}]} | |
end | |
@spec handle_events(any(), any(), any()) :: {:noreply, [], any()} | |
def handle_events(events, _from, state) do | |
for event <- events do | |
:timer.sleep(100) | |
IO.inspect(event, label: "#{inspect(self())} #{__MODULE__}") | |
end | |
{:noreply, [], state} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment