Skip to content

Instantly share code, notes, and snippets.

@cblavier
Created November 2, 2019 10:50
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cblavier/c8fc29de8f0249d8f212c606c6285a10 to your computer and use it in GitHub Desktop.
Save cblavier/c8fc29de8f0249d8f212c606c6285a10 to your computer and use it in GitHub Desktop.
GenStage producer streaming / buffering data from an Ecto query. Can be used with Flow.
defmodule Flow.EctoProducer do
use GenStage
import Ecto.Query
@default_chunk_size 5000
@default_key :id
@stop_delay 10
# Possible options for producer
# - chunk_size: minimal number of rows retrieved by Ecto at once.
# ignored if chunk size is lower than demand
# - key: table key used to sort database entries and perform pagination
def init({repo, query, options}) do
state = %{
repo: repo,
query: from(subquery(query)),
buffer: [],
chunk_size: Keyword.get(options, :chunk_size, @default_chunk_size),
key: Keyword.get(options, :key, @default_key),
last_key: nil,
demand: 0,
exhausted: false
}
{:producer, state, buffer_size: state.chunk_size}
end
# Handles demand until supplying query is exhausted.
# Synchronously fetches new events from Ecto until demand is covered or query is exhausted.
def handle_demand(incoming_demand, state = %{exhausted: false, buffer: buffer, demand: demand}) do
{supply, new_buffer, new_demand} = supply_for_demand(incoming_demand + demand, buffer)
if new_demand > 0 do
{fetched, last_key, exhausted} = fetch_for_demand(state, new_demand)
{supply, new_buffer, new_demand} =
supply_for_demand(incoming_demand + demand, buffer ++ fetched)
if exhausted && Enum.empty?(new_buffer) do
self() |> stop()
end
{:noreply, supply,
%{state | demand: new_demand, buffer: new_buffer, last_key: last_key, exhausted: exhausted}}
else
{:noreply, supply, %{state | demand: 0, buffer: new_buffer}}
end
end
# Handles demand when query is exhausted but buffer is not empty yet
def handle_demand(incoming_demand, state = %{exhausted: true, buffer: buffer, demand: demand}) do
{supply, new_buffer, new_demand} = supply_for_demand(incoming_demand + demand, buffer)
if Enum.empty?(new_buffer) do
self() |> stop()
end
{:noreply, supply, %{state | demand: new_demand, buffer: new_buffer}}
end
# Stops producer when query is exhausted and buffer is empty
def handle_info(:stop, state) do
{:stop, :normal, state}
end
defp supply_for_demand(demand, buffer) do
{supply, new_buffer} = Enum.split(buffer, demand)
new_demand = demand - Enum.count(supply)
{supply, new_buffer, new_demand}
end
defp fetch_for_demand(state, demand) do
new_chunk_size = Enum.max([demand, state.chunk_size])
fetched = fetch_from_db(%{state | chunk_size: new_chunk_size})
{fetched, get_last_key(fetched, state.key), Enum.count(fetched) < new_chunk_size}
end
defp fetch_from_db(state = %{repo: repo, last_key: nil}) do
state
|> base_fetch_query()
|> repo.all()
end
defp fetch_from_db(state = %{repo: repo, key: key, last_key: last_key}) do
state
|> base_fetch_query()
|> where([r], field(r, ^key) > ^last_key)
|> repo.all()
end
defp base_fetch_query(%{query: query, key: key, chunk_size: chunk_size}) do
query
|> order_by([r], field(r, ^key))
|> limit(^chunk_size)
end
defp get_last_key([], _key), do: nil
defp get_last_key(fetched, key) do
fetched |> Enum.at(-1) |> Map.get(key)
end
defp stop(pid) do
Process.send_after(pid, :stop, @stop_delay)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment