Skip to content

Instantly share code, notes, and snippets.

@FabienHenon
Created September 15, 2017 08:11
Show Gist options
  • Save FabienHenon/991addf27b538d5f01a219b2de3e658a to your computer and use it in GitHub Desktop.
Save FabienHenon/991addf27b538d5f01a219b2de3e658a to your computer and use it in GitHub Desktop.
defmodule ModContact.Import.CsvAdapter do
use GenServer
def start_link(name, path) do
GenServer.start_link(__MODULE__, [name: name, path: path])
end
def init(opts) do
{:ok,
%{
path: Keyword.get(opts, :path, nil),
name: Keyword.get(opts, :name, ""),
streaming: nil,
separator: check_separator(Keyword.get(opts, :path, nil))
}
}
end
@doc """
There is no worksheet with CSV files
"""
def handle_call(:worksheet_names, _from, %{path: _path} = state) do
{:reply, {:ok, [""]}, state}
end
@doc """
Returns a sample of data in the form of a list of rows
"""
def handle_call({:sample, _index, rows_count}, _from, %{path: path, separator: separator} = state) do
try do
%{rows: rows, errors: errors} =
path
|> File.stream!
|> CSV.decode(separator: separator)
|> Enum.take(rows_count)
|> Enum.with_index
|> Enum.reduce(%{rows: [], errors: []}, &check_for_row_errors/2)
{:reply, {:ok, %{rows: Enum.reverse(rows), errors: errors}}, state}
rescue
_ -> {:reply, {:error, {:bad_file, "Invalid file"}}, state}
end
end
@doc """
Streams the data. We request a chunk size and get raw rows.
the return values tells us if we can :continue, or if it's the :end_of_stream
"""
def handle_call({:stream, _chunk_size}, _from, %{streaming: nil} = state) do
{:reply, {:error, :not_streaming}, state}
end
def handle_call({:stream, chunk_size}, _from, %{streaming: %{stream: stream}} = state) do
rows =
stream |> Enum.take(chunk_size)
{:reply, {:ok, {:continue, rows}}, state}
end
@doc """
Prepares the file for streaming. This can be a long operation for big files, that's why it is a cast that will send a message directly
back to the user when the stream is ready
"""
def handle_cast({:prepare_stream, _index, from}, %{path: path, separator: separator, name: name, streaming: nil} = state) do
try do
stream =
path
|> File.stream!
|> CSV.decode(separator: separator)
send(from, {:stream_prepared, name, self()})
{:noreply, %{state | streaming: %{stream: stream}}}
rescue
_ ->
send(from, {:prepare_stream_error, name, {:bad_file, "Invalid file"}})
{:noreply, %{state | streaming: nil}}
end
end
def handle_cast({:prepare_stream, _index, from}, %{name: name, streaming: _} = state) do
send(from, {:prepare_stream_error, name, :already_streaming})
{:noreply, state}
end
@doc """
Closing adapter
"""
def handle_cast(:close, state) do
{:stop, :normal, state}
end
# Closes the stream if needed when the process terminates
def terminate(_reason, state) do
state
end
# ------ Logic
defp check_for_row_errors({{:ok, row}, _idx}, %{rows: rows} = res), do: %{res | rows: [row | rows]}
defp check_for_row_errors({{:error, msg}, idx}, %{rows: rows, errors: errors} = res) do
%{res | rows: [[] | rows], errors: [{idx + 1, [{1, :row_error, msg}]} | errors]}
end
defp check_separator(path) do
try do
stream =
path
|> File.stream!
data_comma_count = count_cells(stream, ?,)
data_semicolon_count = count_cells(stream, ?;)
if data_comma_count >= data_semicolon_count, do: ?,, else: ?;
rescue
_ -> ?,
end
end
defp count_cells(stream, separator) do
stream
|> CSV.decode(separator: separator)
|> Enum.take(5)
|> Enum.reduce(0, fn
{:ok, cells}, count -> count + Enum.count(cells)
{:error, _}, count -> count
end)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment