Skip to content

Instantly share code, notes, and snippets.

@amokan
Last active September 29, 2016 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amokan/00bd8e521ead0195fabde16f55820a4e to your computer and use it in GitHub Desktop.
Save amokan/00bd8e521ead0195fabde16f55820a4e to your computer and use it in GitHub Desktop.
defmodule StageB do
@moduledoc """
An updated version of Stage (B) from the post at
https://medium.com/@adammokan/genstage-alternatives-to-parallel-stages-6689f8eabdbd
Hope this example helps, but understand that I just wrote this on the fly and it may or may not contain bugs.
I tried to comment thouroughly so you should have no trouble implementing on your own.
"""
alias Experimental.GenStage
use GenStage
# This struct will represent our internal state for this stage. For now we just care about our work buffer.
# You could easily add counts of how many events were received, how many were finished and sent to the next stage, etc
defstruct buffer: []
# Public API
# start our GenStage and set the initial state
# note the fact that this is using a named process. if you went with a pid-based approach, be sure to change
# the other public `GenStage.cast(__MODULE__` calls below to take the pid instead.
def start_link do
GenStage.start_link(__MODULE__, %__MODULE__{}, name: __MODULE__}
end
@doc """
This can be called from an external process that is performing the long-running work asynchronously.
`StageB.work_finished(%{some_result: :foo, other_info: [1, 2, 3]})`
"""
def work_finished(result) do
# we will use a cast here since there is no good reason to send a response
# back to a pool-based process that could be dead or doing new work now
GenStage.cast(__MODULE__, {:work_finished, result})
end
@doc """
Force the buffer to be processed without delay
"""
def process_buffer do
GenStage.cast(__MODULE__, :process_buffer)
end
@doc """
Tell the VM to schedule the buffer to be processed after a certain amount of time.
TODO: add a guard to make sure `delay` is numeric.
"""
def process_buffer(delay) do
Process.send_after(self, { :"$gen_cast", :process_pending_work }, delay)
end
# Private API
def init(state) do
{:producer_consumer, state)
end
# callback to handle processing of items in the buffer
def handle_cast(:process_pending_work, %__MODULE__{ buffer: [] } = state), do: {:noreply, [], state}
def handle_cast(:process_pending_work, %__MODULE__{ buffer: buffer } = state) do
# assuming we have work in the buffer, lets attempt to move it to the worker pool.
# depending on your implementation, you may want to limit how many things you actually begin processing
# the code below will try to handle this in `queue_pending_work` and anything that cannot be queued up
# now will remain in the buffer until it can
with remaining_work <- buffer |> queue_pending_work,
do: {:noreply, [], %__MODULE__{ state | buffer: remaining_work }}
end
# callback to handle messages from the worker pool process with the finished result
def handle_cast({:work_finished, result}, %__MODULE__{} = state) do
# If you need to do anything else with the work or keep track/increment how many
# things you've delivered/processed, this is the place to do it.
# notice that we need to send the data in list form to the next stage, even though
# we are only handling a single item at the moment.
{:noreply, [result], state}
end
# events come in here from the previous stage
def handle_events(events, _from, %__MODULE__{ buffer: buffer } = state) do
# Defer the processing until after we store the received events in state, just in case we cannot
# queue everything up right now. You can adjust the time as needed in the last parameter.
# See the implementation above.
process_buffer(500)
# we return `[]` to not send any data to the subscribing stage(s) at this point.
# THIS WILL STILL CAUSE CONSUMERS TO SEND DEMAND. BE WARNED.
{:noreply, [], %__MODULE__{ state | buffer: buffer ++ events }}
end
defp queue_pending_work([]), do: [] # no work to do, return an empty list
defp queue_pending_work([ work_item | buffer ]) do
# You can optionally check the work queue on your pool and move it there or not
# If you do not care and just want to dump it all in the worker pool - feel free to
# not bother with this approach.
if queue_limit_reached? do
# assuming we do not want to queue more work at the moment, lets tell the VM to
# try processing items from the buffer again in 4 seconds. Otherwise we would not try
# again until we received more events, which may or may not happen.
process_buffer(4_000)
else
# send the `work_item` to the pool to be processed
# IMPLEMENT your call here
end
end
defp queue_limit_reached? do
# if you are using any queue mechanism for your worker pool and want to limit the items queued up
# this is the place you could check that queue level and return true or false
false
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment