Skip to content

Instantly share code, notes, and snippets.

@mbuhot
Created July 31, 2018 22:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mbuhot/50140aba170ea89620410ce64350d8fb to your computer and use it in GitHub Desktop.
Save mbuhot/50140aba170ea89620410ce64350d8fb to your computer and use it in GitHub Desktop.
Fast text processing in elixir
defmodule TextProcess do
def generate_files(num_files, lines_per_file) do
Enum.each(1..num_files, fn i ->
filename = "data/test_#{i |> Integer.to_string() |> String.pad_leading(3, "0")}.csv"
File.open!(filename, [:write], fn file ->
Enum.each(1..lines_per_file, fn _ ->
IO.puts(file, line())
end)
end)
end)
end
def line() do
"#{n()}|#{n()}|#{n()}|#{n()}|#{n()}|#{n()}|#{s()}/#{s()}/#{s()}/#{s()}/#{s()}"
end
defp n() do
:rand.uniform(100_000_000_000)
end
defp s() do
1..:rand.uniform(32)
|> Enum.map(fn _ -> :rand.uniform(?z - ?a) + ?a end)
|> to_string()
end
def process_files_stream do
start = DateTime.utc_now()
table = :ets.new(:groups, [:public, :duplicate_bag])
Path.wildcard("data/*.csv")
|> Enum.each(&process_file(&1, table))
# |> Task.async_stream(&process_file(&1, table), ordered: false, timeout: :infinity)
# |> Stream.run()
keys(table)
|> Enum.each(&write_output(table, &1))
# |> Task.async_stream(&write_output(table, &1), ordered: false, timeout: :infinity)
# |> Stream.run()
:ets.delete(table)
finish = DateTime.utc_now()
duration = DateTime.diff(finish, start, :milliseconds)
IO.puts("Duration: #{duration} ms")
end
defp process_file(file, table) do
file
|> File.stream!()
|> process_stream()
|> Enum.each(fn record ->
:ets.insert(table, {rem(record.b, 1000), format_output(record)})
end)
end
defp process_stream(input) do
input
|> Stream.reject(&String.starts_with?(&1, "#"))
|> Stream.map(fn line ->
[a, b, c, d, e, f, g, h, i, j, k] = String.split(line, ["|", "/"])
%{
a: a,
b: String.to_integer(b),
c: c,
d: d,
e: e,
f: f,
g: g,
h: h,
i: i,
j: j,
k: k
}
end)
|> Stream.filter(fn record -> record.b > 5000 end)
end
def format_output(record) do
[
record.a,
",",
to_string(record.b),
",",
record.c,
",",
record.d,
",",
record.g,
"/",
record.h,
"/",
record.i,
"\n"
]
end
defp keys(table) do
Stream.unfold(:ets.first(table), fn
:"$end_of_table" -> nil
key -> {key, :ets.next(table, key)}
end)
end
defp write_output(table, key) do
output = "data/#{key}.out"
unless File.exists?(output) do
File.touch!(output)
end
items = :ets.lookup_element(table, key, 2)
File.write!(output, Enum.sort(items))
end
def process_files_imperative do
start = DateTime.utc_now()
Path.wildcard("data/*.csv")
|> Task.async_stream(&process_file_imperative/1)
|> Stream.run()
finish = DateTime.utc_now()
duration = DateTime.diff(finish, start, :milliseconds)
IO.puts("Duration: #{duration} ms")
end
def process_file_imperative(filename) do
IO.puts("processing #{filename}")
File.open(filename, [:read, :raw, read_ahead: 64 * 1024], fn input ->
File.open(filename <> ".out", [:write, :raw, :delayed_write], fn output ->
loop(input, output)
end)
end)
IO.puts("completed #{filename}")
end
defp loop(input, output) do
case :file.read_line(input) do
{:ok, "#" <> _} ->
loop(input, output)
{:ok, line} ->
[a, b, c, d, _e, _f, g, h, i, _j, _k] = :binary.split(line, ["|", "/"], [:global])
ib = :erlang.binary_to_integer(b)
if ib > 5000 do
:file.write(output, [a, ",", b, ",", c, ",", d, ",", g, "/", h, "/", i, "\n"])
end
loop(input, output)
:eof ->
:ok
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment