Skip to content

Instantly share code, notes, and snippets.

@maxim
Created June 19, 2018 04:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxim/59628bbe5c034ec5d795a2e2ba01312b to your computer and use it in GitHub Desktop.
Save maxim/59628bbe5c034ec5d795a2e2ba01312b to your computer and use it in GitHub Desktop.
defmodule StreamExt do
@doc """
Stream records from Ecto in batches.
Supported options are
* batch_size - how many rows to fetch at once
* strategy - either :offset or :id
* :offset - uses SQL offset to fetch pages of results
This is slower, but works for any query.
* :id - uses "where id > last_id" to fetch pages of results
This is faster, but requires the select to be a tuple with
incremental id as its first element.
"""
def ecto(repo, query, options) do
batch_size = options |> Keyword.get(:batch_size, 1000)
strategy = options |> Keyword.get(:strategy, :offset)
case strategy do
:offset -> ecto_in_batches(repo, query, batch_size) |> Stream.concat()
:id -> ecto_in_batches_by_id(repo, query, batch_size) |> Stream.concat()
end
end
@doc """
Stream data in batches from an Ecto repo using a query.
"""
def ecto_in_batches(repo, query, batch_size \\ 1000) do
import Ecto.Query, only: [from: 1, from: 2]
Stream.unfold(0, fn
:done ->
nil
offset ->
results =
repo.all(from(_ in query, offset: ^offset, limit: ^batch_size))
if length(results) < batch_size,
do: {results, :done},
else: {results, offset + batch_size}
end)
end
def ecto_in_batches_by_id(repo, query, batch_size \\ 1000) do
import Ecto.Query, only: [from: 2]
Stream.unfold(-1, fn
:done ->
nil
last_id ->
results =
repo.all(
from(
r in query,
where: r.id > ^last_id,
limit: ^batch_size
)
)
{new_last_id, count} =
results
|> Enum.reduce({-1, 0}, fn r, {_, i} -> {elem(r, 0), i + 1} end)
if count < batch_size,
do: {results, :done},
else: {results, new_last_id}
end)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment