Skip to content

Instantly share code, notes, and snippets.

@eidge
Last active March 10, 2018 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eidge/05419f6bbfd9b5a6e037f809eb716a9f to your computer and use it in GitHub Desktop.
Save eidge/05419f6bbfd9b5a6e037f809eb716a9f to your computer and use it in GitHub Desktop.
defmodule ExJob.Pipeline do
@moduledoc """
A Pipeline represents the workflow for enqueueing and consuming work
from a single queue.
Each pipeline can have different queue, worker pool and dequeue strategies.
"""
use Supervisor
alias ExJob.Pipeline.{Queue, Runner}
def start_link(job_module, opts \\ []) do
Supervisor.start_link(__MODULE__, job_module, opts)
end
def init(job_module) do
children = children_for(job_module)
Supervisor.init(children, strategy: :one_for_one)
end
defp children_for(job_module) do
[{Queue, name: queue_name(job_module)}, {Runner, subscribe_to: [queue_name(job_module)]}]
end
defp queue_name(job_module), do: String.to_atom("#{job_module}-queue")
def enqueue(pid, job) do
queue = queue_stage(pid)
Queue.enqueue(queue, job)
end
defp queue_stage(supervisor) do
{_, pid, _, _} = supervisor
|> Supervisor.which_children
|> Enum.find(fn {_, _, _, modules} -> modules == [Queue] end)
pid
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment