Skip to content

Instantly share code, notes, and snippets.

@vovayartsev
Last active October 7, 2020 13:08
Show Gist options
  • Save vovayartsev/ca04a1d101c95a23dd7d44ad38cd622a to your computer and use it in GitHub Desktop.
Save vovayartsev/ca04a1d101c95a23dd7d44ad38cd622a to your computer and use it in GitHub Desktop.
defmodule Core.Ecto.QueryStream do
@batch_size 200
import Ecto.Query
def from_query(query, where_fn, repo) do
Stream.resource(
fn -> nil end,
fn
nil -> query |> fetch_items_and_acc(repo)
last_item -> query |> where_fn.(last_item) |> fetch_items_and_acc(repo)
end,
fn _ -> :ok end
)
end
defp fetch_items_and_acc(query, repo) do
case repo.all(from(query, limit: @batch_size)) do
[] -> {:halt, nil}
items -> {[items], List.last(items)}
end
end
end
# USAGE
import Ecto.Query
query = from(Dice.Events.Event, order_by: :id)
where_fn = fn q, %{id: last_id} -> where(q, [e, ...], e.id > ^last_id) end
stream = Core.Ecto.QueryStream.from_query(query, where_fn, Dice.KimRepo)
# The stream emits batches of 200 items, and we can attach indexes to them using Stream.with_index()
stream |> Stream.with_index() |> Enum.map(fn {l, i} -> {length(l), i} end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment