Last active
July 14, 2023 15:57
-
-
Save dmkit/8cf8965f0c229624dc0c76629d968f04 to your computer and use it in GitHub Desktop.
[Elixir] [csv] [stream] A sample of how I would parse a big csv file and insert the data to a database. How would you do it?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Importer do | |
@moduledoc """ | |
A sample module | |
""" | |
alias Sample.Repo | |
alias Sample.User | |
@doc """ | |
Parses and inserts data to database | |
## Examples | |
iex> import_csv("/etc/data.csv", "\t") | |
:ok | |
""" | |
def import_csv(path, del \\ ",") do | |
# We will use Stream for lazy enumeraion of | |
# the csv file | |
streams = | |
path | |
|> File.stream!() | |
# remove header | |
|> Stream.drop(1) | |
|> Stream.map(fn row -> | |
row | |
|> String.trim() | |
|> String.split(del) | |
|> format_csv_row() | |
end) | |
|> Stream.filter(fn | |
{:error, _} -> false | |
_ -> true | |
end) | |
# number of rows that will be passed to | |
# bulk_insert/1 | |
|> Stream.chunk_every(100) | |
# We don't want to crash the parent process when a task crashes | |
# so we use no link with a supervisor task. | |
# max_concurrency is set to 50 for now. | |
# Depending on how big the data is, this can | |
# run 50 concurrent processes, and every process is | |
# calling bulk_insert/1, trying to insert 100 rows to DB. | |
# So DB may become a bottle neck here so better watch it as well. | |
Task.Supervisor.async_stream_nolink( | |
Sender.TaskSupervisor, | |
streams, | |
&bulk_insert/1, | |
ordered: false, | |
max_concurrency: 50 | |
) | |
|> Enum.each(fn | |
{:ok, _} -> | |
:ok | |
{:exit, reason} -> | |
# may want to log this somewhere | |
IO.inspect(reason) | |
end) | |
end | |
defp format_csv_row([name, age, gender, birthday]) do | |
[name: name, age: age, gender: gender, birthday: birthday] | |
end | |
defp format_csv_row(row) do | |
# may want to log this somewhere | |
{:error, row} | |
end | |
defp bulk_insert(users) do | |
# something like | |
Repo.insert_all(User, users) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment