Skip to content

Instantly share code, notes, and snippets.

@rlb3
Created March 31, 2021 15:23
Show Gist options
  • Save rlb3/57c7dc2f9e10033d389b09cd8d229d08 to your computer and use it in GitHub Desktop.
Save rlb3/57c7dc2f9e10033d389b09cd8d229d08 to your computer and use it in GitHub Desktop.
@doc """
Stream chunks of results from the given queryable.
Unlike Repo.stream, this function does not keep a long running transaction open.
Hence, consistency is not guarenteed in the presence of rows being deleted or sort criteria changing.
## Example
Ecto.Query.from(u in Users, order_by: [asc: :created_at])
|> Repo.chunk(100)
|> Stream.map(&process_batch_of_users)
|> Stream.run()
"""
@spec chunk(Ecto.Queryable.t, integer) :: Stream.t
def chunk(queryable, chunk_size) do
chunk_stream = Stream.unfold(1, fn page_number ->
page = Repo.paginate(queryable, page: page_number, page_size: chunk_size)
{page.entries, page_number + 1}
end)
Stream.take_while(chunk_stream, fn [] -> false; _ -> true end)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment