Skip to content

Instantly share code, notes, and snippets.

@sescobb27
Forked from narrowtux/repo_stream.ex
Created December 21, 2017 15:16
Show Gist options
  • Save sescobb27/5b4b213883fe5f8f42702d8c411dcee0 to your computer and use it in GitHub Desktop.
Save sescobb27/5b4b213883fe5f8f42702d8c411dcee0 to your computer and use it in GitHub Desktop.
Module for Ecto and GenStage to work together when you want to use Repo.stream in conjunction with a GenStage subscriber or a Flow.
defmodule RepoStream do
defmodule Producer do
use GenStage
defstruct [:demand, :pid]
def start_link() do
GenStage.start_link(__MODULE__, self())
end
def init(caller) do
{:producer, %{
demand: 0,
pid: caller
}, []}
end
def handle_call({:set_receiver, pid}, state) do
{:reply, :ok, [], %{state | pid: pid}}
end
def handle_demand(d, state) do
send state.pid, {:demand, d}
{:noreply, [], %{state | demand: d}}
end
def handle_subscribe(_, _, _, state) do
send state.pid, :ready
{:automatic, state}
end
def handle_info({:supply, items}, state) do
remaining_demand =
(state.demand - length(items))
|> :erlang.max(0)
if remaining_demand > 0 do
send state.pid, {:demand, remaining_demand}
end
{:noreply, items, %{state | demand: remaining_demand}}
end
def terminate(_, state) do
:shutdown
end
end
@doc """
Sets up a producer stage that produces the messages from a repo stream
Returns the producer stage which you can use for flow stuff
"""
def query_into_stage(query, repo) do
chunk_size = 500
stream = repo.stream(query)
pid = self()
Task.async(fn ->
repo.transaction(fn ->
{:ok, producer} = __MODULE__.Producer.start_link()
Process.send_after pid, {:release, producer}, 100
forward(stream, producer)
end)
end)
receive do
{:release, prod} -> prod
end
end
defp forward(stream, producer) do
# type acc :: {acc :: list[any], demand :: positive_integer}
stream
|> Stream.transform(
fn ->
receive do
:ready -> :ok
end
{[], 0}
end,
fn
event, {[], 0} ->
receive do
{:demand, 1} ->
send producer, {:supply, [event]}
{[], {[], 0}}
{:demand, d} ->
{[], {[event], d}}
end
event, {acc, d} when d > length(acc) + 1 ->
{[], {[event | acc], d}}
event, {acc, d} when d == length(acc) + 1 ->
send producer, {:supply, [event | acc]}
{[], {[], 0}}
end,
fn {acc, _} ->
if length(acc) > 0 do
send producer, {:supply, acc}
end
GenStage.stop(producer)
end
)
|> Stream.run()
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment