Created
September 15, 2017 08:11
-
-
Save FabienHenon/991addf27b538d5f01a219b2de3e658a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule ModContact.Import.CsvAdapter do | |
use GenServer | |
def start_link(name, path) do | |
GenServer.start_link(__MODULE__, [name: name, path: path]) | |
end | |
def init(opts) do | |
{:ok, | |
%{ | |
path: Keyword.get(opts, :path, nil), | |
name: Keyword.get(opts, :name, ""), | |
streaming: nil, | |
separator: check_separator(Keyword.get(opts, :path, nil)) | |
} | |
} | |
end | |
@doc """ | |
There is no worksheet with CSV files | |
""" | |
def handle_call(:worksheet_names, _from, %{path: _path} = state) do | |
{:reply, {:ok, [""]}, state} | |
end | |
@doc """ | |
Returns a sample of data in the form of a list of rows | |
""" | |
def handle_call({:sample, _index, rows_count}, _from, %{path: path, separator: separator} = state) do | |
try do | |
%{rows: rows, errors: errors} = | |
path | |
|> File.stream! | |
|> CSV.decode(separator: separator) | |
|> Enum.take(rows_count) | |
|> Enum.with_index | |
|> Enum.reduce(%{rows: [], errors: []}, &check_for_row_errors/2) | |
{:reply, {:ok, %{rows: Enum.reverse(rows), errors: errors}}, state} | |
rescue | |
_ -> {:reply, {:error, {:bad_file, "Invalid file"}}, state} | |
end | |
end | |
@doc """ | |
Streams the data. We request a chunk size and get raw rows. | |
the return values tells us if we can :continue, or if it's the :end_of_stream | |
""" | |
def handle_call({:stream, _chunk_size}, _from, %{streaming: nil} = state) do | |
{:reply, {:error, :not_streaming}, state} | |
end | |
def handle_call({:stream, chunk_size}, _from, %{streaming: %{stream: stream}} = state) do | |
rows = | |
stream |> Enum.take(chunk_size) | |
{:reply, {:ok, {:continue, rows}}, state} | |
end | |
@doc """ | |
Prepares the file for streaming. This can be a long operation for big files, that's why it is a cast that will send a message directly | |
back to the user when the stream is ready | |
""" | |
def handle_cast({:prepare_stream, _index, from}, %{path: path, separator: separator, name: name, streaming: nil} = state) do | |
try do | |
stream = | |
path | |
|> File.stream! | |
|> CSV.decode(separator: separator) | |
send(from, {:stream_prepared, name, self()}) | |
{:noreply, %{state | streaming: %{stream: stream}}} | |
rescue | |
_ -> | |
send(from, {:prepare_stream_error, name, {:bad_file, "Invalid file"}}) | |
{:noreply, %{state | streaming: nil}} | |
end | |
end | |
def handle_cast({:prepare_stream, _index, from}, %{name: name, streaming: _} = state) do | |
send(from, {:prepare_stream_error, name, :already_streaming}) | |
{:noreply, state} | |
end | |
@doc """ | |
Closing adapter | |
""" | |
def handle_cast(:close, state) do | |
{:stop, :normal, state} | |
end | |
# Closes the stream if needed when the process terminates | |
def terminate(_reason, state) do | |
state | |
end | |
# ------ Logic | |
defp check_for_row_errors({{:ok, row}, _idx}, %{rows: rows} = res), do: %{res | rows: [row | rows]} | |
defp check_for_row_errors({{:error, msg}, idx}, %{rows: rows, errors: errors} = res) do | |
%{res | rows: [[] | rows], errors: [{idx + 1, [{1, :row_error, msg}]} | errors]} | |
end | |
defp check_separator(path) do | |
try do | |
stream = | |
path | |
|> File.stream! | |
data_comma_count = count_cells(stream, ?,) | |
data_semicolon_count = count_cells(stream, ?;) | |
if data_comma_count >= data_semicolon_count, do: ?,, else: ?; | |
rescue | |
_ -> ?, | |
end | |
end | |
defp count_cells(stream, separator) do | |
stream | |
|> CSV.decode(separator: separator) | |
|> Enum.take(5) | |
|> Enum.reduce(0, fn | |
{:ok, cells}, count -> count + Enum.count(cells) | |
{:error, _}, count -> count | |
end) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment