Skip to content

Instantly share code, notes, and snippets.

@evadne
Last active August 4, 2023 20:38
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save evadne/43e79dd51c068dc19e42086cdbecc8a7 to your computer and use it in GitHub Desktop.
Save evadne/43e79dd51c068dc19e42086cdbecc8a7 to your computer and use it in GitHub Desktop.
out.txt
out-sorted.txt
words.txt

Solution to Erlang/Elixir String Performance: Can this be improved?

Usage

Preparation

$ curl http://teralink.net/\~serpent/words.txt > words.txt

$ md5 words.txt 
MD5 (words.txt) = 640ef9e082ef75ef07f0e9869e9d8ae2

$du -h words.txt
217M	words.txt

Execution

$ elixir ./count.ex ./words.txt --sort 1> out-sorted.txt
Started
Using 4 Workers
Processing: 4.474982s
Load Results: 0.717677s
Order Results: 1.109028s
Print Results: 2.277602s
Total Runtime: 8.583487s

$ elixir ./count.ex ./words.txt 1> out.txt 
Started
Using 4 Workers
Processing: 4.586417s
Load Results: 0.776269s
Print Results: 2.488612s
Total Runtime: 7.855561s

$ elixir ./count-stream.ex < words.txt 1> out.txt
Started
Processing: 4.148846s
Reporting: 0.749119s
Total Runtime: 4.909122s

$ elixir ./count-stream.ex --sort < words.txt 1> out-sorted.txt
Started
Processing: 4.271136s
Reporting: 1.856914s
Total Runtime: 6.137186s

Notes for count.ex

This solution uses multiple workers to perform reads against the same file. It does so by first using File.stat/1 to obtain the size of the file, then generating range tuples (start position and bytes to be read) that can be farmed out to each worker. Workers are managed using Tasks.async_stream/3 with no timeout. Furthermore, the concurrency level is set at 50% of all schedulers, which on a HT-enabled machine would, by default, give one worker per real CPU core.

Instead of using a Stream, which essentially allows processing one character at a time, this solution uses raw file handlers in workers, and so the level of concurrency is greater than a solution based on Streams.

As per Erlang/OTP documentation on the raw mode, this mode is faster as no Erlang process is needed to handle the file. However, this means that either file:read/2 or IO.binread./2 must be used to read from the file.

Reading from the file is done in chunks with the file itself having been configured with a read-ahead value. This allows the runtime system to preload bits that the code will read later, and avoids spooling too much data at once within the functional layer of the code.

The Problem at hand requires counting of words, which essentially requires the program to walk the input stream. In this solution, we use a simple assumption, which is that any character which is not a newline or a space must be part of a word, and hence encountering either would mean that we have hit word boundary. This allows speedy accumulation of in-flight words, one character at a time. Words are accumulated backwards until they need to be reported, at which point the charlist is reversed, and the counter in ETS is increased.

When reading files in multiple chunks, it is possible that words would have been split between two chunks, so there is support for capturing residual prefix and suffix from each chunk. The prefix and suffix are not immediately reported; instead the top-level function glues them together and reports them to ETS after the workers have finished their jobs.

Observations:

  • The module attributes @bytes_read_ahead and @bytes_read_chunk can be tweaked.

  • Changing a shared ETS table to having each Worker write to its own ETS table, and integrating them later on, makes the code more complicated, and does not improve performance.

Notes for count-stream.ex

This is a new version created to address Johanna’s comments. The fundamental differences are that:

  1. It uses IO.binread/2 from STDIN and emits one chunk every @bytes_read_chunk bytes (10 MB by default).
  2. It uses multiple workers to concurrently split and match words.
  3. It reuses the prefix / suffix logic to stitch together words at boundaries and mop them up at top level.
  4. It uses a compiled pattern (via :binary.compile_pattern/2) to split binaries, which is then shared among workers.
  5. It reports the results with a single call to IO.puts/1.

Additionally, instad of splitting data into a known number of chunks each with an unknown size, this version splits the data into an unknown number of chunks each with a known size. This offers several benefits,

  1. We could stick with a single Stream which carries on producing more chunks.
  2. We can leverate Task.async_stream to manage the processing of each chunk.
  3. Many binary splitting functions provided by the binary module can be used safely (some were slow if given a lot of data).

This greatly simplifies supervision logic.

Within this version there are two flags:

  • --unicode which toggles between IO.stream/2 and IO.binstream/2.
  • --sort which toggles sorting (order by count, desdending).
defmodule StreamWorker do
@compile [:native, {:hipe, [:o3]}]
@bytes_read_chunk 1048576 * 10
@cores_per_worker 2
@table_options [:set, :public, {:keypos, 1}, {:write_concurrency, true}]
@separators [" ", "\n"]
@profile false
if @profile do
defmacrop measure(title, do: block) do
quote do
{time, value} = :timer.tc(fn ->
unquote(block)
end)
label = :erlang.float_to_binary(time / 1000000, decimals: 2)
IO.puts(:stderr, [unquote(title), ": ", label, "s"])
value
end
end
else
defmacrop measure(_, do: block) do
quote do
unquote(block)
end
end
end
def start(options) do
measure "Total Runtime" do
{:ok, ets_table, task_stream} = measure "Starting" do
setup(options)
end
:ok = measure "Processing" do
process(task_stream)
end
:ok = measure "Reporting" do
report(ets_table, options)
end
end
end
defp setup(options) do
{:ok, ets_table} = setup_ets_table()
{:ok, input_stream} = setup_input_stream(options)
{:ok, task_stream} = setup_task_stream(ets_table, input_stream)
{:ok, transform_stream} = setup_transform_stream(ets_table, task_stream)
{:ok, ets_table, transform_stream}
end
defp setup_ets_table do
ets_table = :ets.new(:words_count, @table_options)
{:ok, ets_table}
end
defp setup_input_stream(options) do
cond do
options[:unicode] -> {:ok, IO.stream(:stdio, @bytes_read_chunk)}
true -> {:ok, IO.binstream(:stdio, @bytes_read_chunk)}
end
end
defp setup_task_stream(ets_table, input_stream) do
separator_pattern = :binary.compile_pattern(@separators)
workers_count = round(System.schedulers_online() / @cores_per_worker)
task_fun = fn data -> process_chunk(ets_table, separator_pattern, data) end
task_options = [ordered: true, max_concurrency: workers_count, timeout: :infinity]
task_stream = Task.async_stream(input_stream, task_fun, task_options)
{:ok, task_stream}
end
defp setup_transform_stream(ets_table, task_stream) do
reducer = fn {:ok, {prefix, _} = element}, {_, suffix} ->
case {suffix, prefix} do
{"", ""} -> nil
{"", prefix} -> count_word(ets_table, prefix)
{suffix, ""} -> count_word(ets_table, suffix)
{suffix, prefix} -> count_word(ets_table, suffix <> prefix)
end
{[], element}
end
stream = Stream.transform(task_stream, {"", ""}, reducer)
{:ok, stream}
end
defp process(task_stream), do: Stream.run(task_stream)
defp process_chunk(ets_table, separator_pattern, data) do
words = String.split(data, separator_pattern)
process_words(ets_table, words, nil, nil)
end
defp process_words(ets_table, [head | rest], nil, suffix) do
process_words(ets_table, rest, head, suffix)
end
defp process_words(ets_table, [suffix], prefix, nil) do
process_words(ets_table, [], prefix, suffix)
end
defp process_words(ets_table, [head | rest], prefix, suffix) do
count_word(ets_table, head)
process_words(ets_table, rest, prefix, suffix)
end
defp process_words(_, [], prefix, suffix) do
{prefix, suffix}
end
defp report(ets_table, options) when is_list(options), do: report(ets_table, options[:sort])
defp report(ets_table, false), do: report_load(ets_table) |> report_print()
defp report(ets_table, true), do: report_load(ets_table) |> report_sort() |> report_print()
defp report_load(ets_table), do: :ets.tab2list(ets_table)
defp report_sort(entries), do: List.keysort(entries, 1)
defp report_format({word, count}), do: [Integer.to_string(count), " ", word, "\n"]
defp report_print(results), do: results |> Enum.map(&report_format/1) |> IO.puts
defp count_word(ets_table, word) do
:ets.update_counter(ets_table, word, {2, 1}, {word, 0})
end
end
options_argv = System.argv()
StreamWorker.start([
sort: Enum.member?(options_argv, "--sort"),
unicode: Enum.member?(options_argv, "--unicode")
])
defmodule ConcurrentWorker do
@bytes_read_ahead 1048576
@bytes_read_chunk 1024
def start(file_path, options) do
IO.puts(:stderr, "Started")
ets_table = :ets.new(:words_count, [:set, :public, {:keypos, 1}, {:read_concurrency, true}, {:write_concurrency, true}])
{:ok, stat} = File.stat(file_path)
workers_count = round(System.schedulers_online() / 2)
workers_ranges = build_ranges(stat.size, workers_count)
workers_arguments = for range <- workers_ranges, do: {file_path, ets_table, range}
worker_fun = fn arg -> start_worker(arg) end
IO.puts(:stderr, "Using #{workers_count} Workers")
task_options = [ordered: true, max_concurrency: workers_count, timeout: :infinity]
task_stream = Task.async_stream(workers_arguments, worker_fun, task_options)
task_results = measure("Processing", fn -> Enum.to_list(task_stream) end)
# re-combine words cut off between runs
task_results |> Enum.chunk_every(2, 1, :discard) |> Enum.each(fn
[ok: {_, suffix}, ok: {nil, _}] -> report_word(ets_table, suffix)
[ok: {_, suffix}, ok: {prefix, _}] -> report_word(ets_table, suffix ++ prefix)
_ -> :ok
end)
results = measure("Load Results", fn -> :ets.tab2list(ets_table) end)
results = maybe_sort(results, options)
results = measure("Print Results", fn -> Enum.each(results, &print/1) end)
_ = results
:ok
end
defp maybe_sort(results, options) do
if Keyword.get(options, :sort, false) do
measure("Order Results", fn ->
Enum.sort(results, fn {_, lhs}, {_, rhs} -> lhs > rhs end)
end)
else
results
end
end
def measure(title, fun) do
{time, value} = :timer.tc(fun)
IO.puts(:stderr, ["#{title}: #{time / 1000000}s"])
value
end
defp print({word, count}) do
IO.puts([Integer.to_string(count), " ", word])
end
defp build_ranges(file_size, workers_count) do
chunk_size = round(Float.ceil(file_size / workers_count))
build_ranges(0, file_size, chunk_size)
end
defp build_ranges(bytes_consumed, bytes_remaining, chunk_size, chunks \\ [])
defp build_ranges(_, 0, _, chunks), do: Enum.reverse(chunks)
defp build_ranges(bytes_consumed, bytes_remaining, chunk_size, chunks) do
bytes_to_read = min(chunk_size, bytes_remaining)
chunk = {bytes_consumed, bytes_to_read}
to_bytes_consumed = bytes_consumed + bytes_to_read
to_bytes_remaining = bytes_remaining - bytes_to_read
build_ranges(to_bytes_consumed, to_bytes_remaining, chunk_size, [chunk | chunks])
end
defp start_worker({file_path, ets_table, {start_position, bytes_remaining}}) do
{:ok, file_handle} = File.open(file_path, [:raw, :charlist, {:read_ahead, @bytes_read_ahead}])
:file.position(file_handle, start_position)
consume_range(file_handle, ets_table, bytes_remaining)
end
defp consume_range(file_handle, ets_table, bytes_remaining, current_word \\ [], prefix \\ :unknown)
defp consume_range(file_handle, _, 0, current_word, prefix) do
:ok = File.close(file_handle)
{prefix, Enum.reverse(current_word)}
end
defp consume_range(file_handle, ets_table, bytes_remaining, current_word, prefix) do
bytes_read = min(bytes_remaining, @bytes_read_chunk)
bytes_remaining = bytes_remaining - bytes_read
data = IO.binread(file_handle, bytes_read)
chunk_state = {file_handle, ets_table, bytes_remaining}
consume_chunk(chunk_state, data, current_word, prefix)
end
defguardp is_separator(char) when char in [?\n, ?\ ]
# if the string does not start with a separator then it would contain
# parts of a word that is cut between 2 workers, in which case the prefix
# needs to be glued back onto the suffix from the previous worker
#
defp split_prefix([head | rest]) when is_separator(head), do: {nil, rest}
defp split_prefix([head | rest]), do: split_prefix(rest, [head])
defp split_prefix([head | rest], acc) when is_separator(head), do: {Enum.reverse(acc), rest}
defp split_prefix([head | rest], acc), do: split_prefix(rest, [head | acc])
defp consume_chunk(chunk_state, data, [] = current_word, :unknown) do
{prefix, data} = split_prefix(data)
consume_chunk(chunk_state, data, current_word, prefix)
end
defp consume_chunk(chunk_state, data, current_word, :unknown) do
consume_chunk(chunk_state, data, current_word, nil)
end
defp consume_chunk(chunk_state, [head | rest], [] = current_word, prefix) when is_separator(head) do
consume_chunk(chunk_state, rest, current_word, prefix)
end
defp consume_chunk(chunk_state, [head | rest], current_word, prefix) when is_separator(head) do
{_, ets_table, _} = chunk_state
report_word(ets_table, Enum.reverse(current_word))
consume_chunk(chunk_state, rest, [], prefix)
end
defp consume_chunk(chunk_state, [head | rest], current_word, prefix) do
consume_chunk(chunk_state, rest, [head | current_word], prefix)
end
defp consume_chunk(chunk_state, [], current_word, prefix) do
{file_handle, ets_table, bytes_remaining} = chunk_state
consume_range(file_handle, ets_table, bytes_remaining, current_word, prefix)
end
defp report_word(ets_table, word) do
:ets.update_counter(ets_table, word, {2, 1}, {nil, 0})
end
end
[file_path | options] = System.argv()
options = [sort: Enum.member?(options, "--sort")]
ConcurrentWorker.measure("Total Runtime", fn ->
ConcurrentWorker.start(file_path, options)
end)
@llelf
Copy link

llelf commented May 3, 2019

You are sorting it not by count

@evadne
Copy link
Author

evadne commented Sep 4, 2019

Indeed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment