Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
alias Experimental.GenStage
defmodule WorkerPool.Worker do
@type subscription_options :: Keyword.t
@type producer :: pid | {pid, subscription_options}
@doc """
Starts a worker, subscribed to the provided job producers.
"""
@spec start_link([producer], Keyword.t) :: {:ok, pid}
def start_link(producers, options \\ []) do
GenStage.start_link(__MODULE__.Server, producers, options)
end
defmodule Server do
use GenStage
require Logger
@moduledoc false
@doc "Implementation of c:GenStage.init/1."
def init(producers) do
{:ok, task_supervisor} = Task.Supervisor.start_link()
{:consumer, task_supervisor, subscribe_to: producers}
end
@doc "Implementation of c:GenStage.handle_events/3."
def handle_events(jobs, _from, task_supervisor) do
Enum.each(jobs, &process_job_in_isolated_process(&1, task_supervisor))
{:noreply, [], task_supervisor}
end
defp process_job_in_isolated_process(job, task_supervisor) do
task_supervisor
|> Task.Supervisor.async_nolink(fn -> process_job(job) end)
|> wait_for_process_completion(job)
end
defp wait_for_process_completion(task, job) do
case Task.yield(task, job.timeout) do
{:ok, :task_finished} -> :ok
{:exit, reason} -> notify_observer(job, :job_failed, reason)
nil ->
Task.shutdown(task)
notify_observer(job, :job_timed_out, job.timeout)
end
end
defp process_job(job) do
Logger.metadata(job.logger_metadata)
if Process.alive?(job.observer_pid) do
result = job.job_fun.()
notify_observer(job, :job_finished, result)
else
Logger.warn "#{__MODULE__} aborted job #{inspect job.id}, " <>
"since observer #{inspect job.observer_pid} is down"
end
:task_finished
end
defp notify_observer(job, notification, data) do
send(job.observer_pid, {notification, job.id, data})
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.