Skip to content

Instantly share code, notes, and snippets.

@wfgilman
Last active February 26, 2018 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wfgilman/699ab84cf24a9d59ddb893a21a5d2a79 to your computer and use it in GitHub Desktop.
Save wfgilman/699ab84cf24a9d59ddb893a21a5d2a79 to your computer and use it in GitHub Desktop.
GenStage Pipeline
# Supervisor which starts pipeline.
defmodule Pipeline.Supervisor do
use Supervisor
alias Pipeline.{Collector, Requestor, Loader}
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
children = [
supervisor(Task.Supervisor, [[name: Pipeline.Task.Supervisor]]),
worker(Collector, []),
worker(Requestor, []),
worker(Loader, [])
]
opts = [strategy: :one_for_one, name: __MODULE__, max_restarts: 5]
supervise(children, opts)
end
end
# Stage 1: Collect jobs (events) from Database
defmodule Pipeline.Collector do
use GenStage
@poll_interval 60_000
def start_link do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
@impl true
def init(initial_demand) do
{:producer, initial_demand}
end
@impl true
def handle_demand(demand, current_demand) do
new_demand = demand + current_demand
send(__MODULE__, :check_for_jobs)
{:noreply, [], new_demand}
end
@impl true
def handle_info(:check_for_jobs, current_demand) do
Process.send_after(__MODULE__, :check_for_jobs, @poll_interval)
jobs = Repo.Job.all_waiting(current_demand)
new_demand = current_demand - Enum.count(jobs)
{:noreply, jobs, new_demand}
end
end
# Stage 2: Make HTTP Request for each job (event). Returns data.
defmodule Pipeline.Requestor do
use GenStage
alias Pipeline.{Collector, Task.Supervisor}
def start_link do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
{:producer_consumer, :ok, subscribe_to: [{Collector, max_demand: 10}]}
end
@impl true
def handle_events(jobs, _from, state) do
data =
for job <- jobs do
job
|> start_task()
|> yield_reply(job)
end
{:noreply, filter(data), state}
end
# Helper functions.
defp start_task(job) do
Task.Supervisor.async_nolink(Supervisor, fn ->
# Make request to 3rd party API.
end)
end
defp yield_reply(task, job) do
case Task.yield(task) do
{:ok, data} ->
{datum, job}
_ ->
handle_failure(job)
end
end
end
# Stage 3: Loads data (event) into database.
defmodule Pipeline.Loader do
use GenStage
alias Pipeline.{Requestor, Task.Supervisor}
def start_link do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
{:consumer, :ok, subscribe_to: [{Requestor, max_demand: 10}]}
end
@impl true
def handle_events(data, _from, state) do
for {datum, job} <- data do
datum
|> start_task()
|> yield_reply(job)
end
{:noreply, [], state}
end
## Helper functions.
defp start_task(datum) do
Task.Supervisor.async_nolink(Supervisor, fn ->
# Load datum into database.
end)
end
defp yield_reply(task, job) do
case Task.yield(task) do
{:ok, _} ->
put_success(job)
_ ->
hanldle_failure(job)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment