Skip to content

Instantly share code, notes, and snippets.

@ribomation
Created January 29, 2024 09:45
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save ribomation/a00b11085e54f518bccc59797291c1aa to your computer and use it in GitHub Desktop.
1BRC - Erlang
-module(calc).
-export([start/0, start/1, launch/1]).
-export([
reader_loop/2,
aggregator_loop/2,
writer_loop/0
]).
start() -> launch(filename(data_tiny)).
start(Size) -> launch(filename(Size)).
filename(Size) ->
Suffix = case Size of
data_tiny -> "weather-data-tiny.csv";
data_100k -> "weather-data-100K.csv";
data_1m -> "weather-data-1M.csv";
data_10m -> "weather-data-10M.csv";
data_100m -> "weather-data-100M.csv";
data_1b -> "weather-data-1B.csv"
end,
string:concat("../../data/", Suffix).
launch(Filename) ->
{_, _} = erlang:statistics(wall_clock),
case whereis(calc_avg) of
undefined -> register(calc_avg, self());
Pid when is_pid(Pid) -> ok
end,
{writer, Writer} = writer_start(),
{aggregator, Aggregator} = aggregator_start(Writer),
{reader, Reader} = reader_start(Filename, Aggregator),
io:format("pids: ~p, ~p, ~p\nfilename: ~s\n----\n", [Reader, Aggregator, Writer, Filename]),
receive
{done, Writer} ->
unregister(calc_avg),
{_, Diff} = erlang:statistics(wall_clock),
io:format("----\nelapsed ~.2f seconds, ~s\n", [Diff / 1000, Filename])
end.
%% --- writer ---
writer_start() ->
{writer, spawn(calc, writer_loop, [])}.
writer_loop() ->
receive
{data, ListOfStationAndData} ->
Sorted = lists:keysort(1, ListOfStationAndData),
Print = fun ({Station, {Count, Sum, Min, Max}}) ->
io:format("~ts: ~.2f C, ~.1f/~.1f (~p)~n",
[Station, Sum / Count, Min, Max, Count])
end,
lists:foreach(Print, Sorted),
calc_avg ! {done, self()}
end.
%% --- aggregator ---
aggregator_start(Next) ->
{aggregator, spawn(calc, aggregator_loop, [maps:new(), Next])}.
aggregator_loop(Data, Next) ->
receive
eof ->
Next ! {data, maps:to_list(Data)}
;
{measurement, Station, Temperature} ->
NewData = case maps:get(Station, Data, none) of
none ->
{1, Temperature, Temperature, Temperature};
{Count, Sum, Min, Max} ->
{Count + 1, Sum + Temperature, min(Temperature, Min), max(Temperature, Max)}
end,
aggregator_loop(maps:put(Station, NewData, Data), Next)
end.
%% --- reader ---
reader_start(Filename, Next) ->
File = case file:open(Filename, [read, {encoding, utf8}]) of
{ok, F} -> F;
{error, Reason} ->
io:format("cannot open ~s: ~p", [Filename, Reason]),
exit(kill)
end,
{reader, spawn(calc, reader_loop, [File, Next])}.
reader_loop(File, Next) ->
case io:get_line(File, '') of
eof ->
Next ! eof,
file:close(File);
Line ->
[Station, Txt] = string:split(Line, ";"),
{Temperature, _} = string:to_float(Txt),
Next ! {measurement, Station, Temperature},
reader_loop(File, Next)
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment