Skip to content

Instantly share code, notes, and snippets.

@dmkit
Last active July 14, 2023 15:57
Show Gist options
  • Save dmkit/8cf8965f0c229624dc0c76629d968f04 to your computer and use it in GitHub Desktop.
Save dmkit/8cf8965f0c229624dc0c76629d968f04 to your computer and use it in GitHub Desktop.
[Elixir] [csv] [stream] A sample of how I would parse a big csv file and insert the data to a database. How would you do it?
defmodule Importer do
@moduledoc """
A sample module
"""
alias Sample.Repo
alias Sample.User
@doc """
Parses and inserts data to database
## Examples
iex> import_csv("/etc/data.csv", "\t")
:ok
"""
def import_csv(path, del \\ ",") do
# We will use Stream for lazy enumeraion of
# the csv file
streams =
path
|> File.stream!()
# remove header
|> Stream.drop(1)
|> Stream.map(fn row ->
row
|> String.trim()
|> String.split(del)
|> format_csv_row()
end)
|> Stream.filter(fn
{:error, _} -> false
_ -> true
end)
# number of rows that will be passed to
# bulk_insert/1
|> Stream.chunk_every(100)
# We don't want to crash the parent process when a task crashes
# so we use no link with a supervisor task.
# max_concurrency is set to 50 for now.
# Depending on how big the data is, this can
# run 50 concurrent processes, and every process is
# calling bulk_insert/1, trying to insert 100 rows to DB.
# So DB may become a bottle neck here so better watch it as well.
Task.Supervisor.async_stream_nolink(
Sender.TaskSupervisor,
streams,
&bulk_insert/1,
ordered: false,
max_concurrency: 50
)
|> Enum.each(fn
{:ok, _} ->
:ok
{:exit, reason} ->
# may want to log this somewhere
IO.inspect(reason)
end)
end
defp format_csv_row([name, age, gender, birthday]) do
[name: name, age: age, gender: gender, birthday: birthday]
end
defp format_csv_row(row) do
# may want to log this somewhere
{:error, row}
end
defp bulk_insert(users) do
# something like
Repo.insert_all(User, users)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment