Created
January 18, 2024 12:03
-
-
Save ribomation/69a7f59a5b18ef8d7c8477b0ce2108fb to your computer and use it in GitHub Desktop.
Ambitious optimization of 1BRC using Modern C++
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <ostream> | |
#include <algorithm> | |
#include <limits> | |
#include <format> | |
namespace ribomation::brc { | |
struct Aggregation { | |
unsigned count = 0; | |
double sum = 0; | |
double min = std::numeric_limits<double>::max(); | |
double max = std::numeric_limits<double>::min(); | |
void operator+=(double t) { | |
++count; | |
sum += t; | |
min = std::min(min, t); | |
max = std::max(max, t); | |
} | |
void operator+=(Aggregation const& a) { | |
count += a.count; | |
sum += a.sum; | |
min = std::min(min, a.min); | |
max = std::max(max, a.max); | |
} | |
}; | |
inline auto operator<<(std::ostream& os, Aggregation const& a) -> std::ostream& { | |
return os << std::format("{:+.2f}C, {:+.1f}/{:+.1f} ({:Ld})", | |
(a.sum / a.count), a.min, a.max, a.count); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <unordered_map> | |
#include <vector> | |
#include <thread> | |
#include "util.hxx" | |
#include "memory-mapped-file.hxx" | |
#include "aggregation.hxx" | |
#include "worker.hxx" | |
namespace rm = ribomation::util; | |
namespace io = ribomation::io; | |
namespace br = ribomation::brc; | |
using std::cout; | |
using std::string_view; | |
using std::pair; | |
using std::vector; | |
using std::unordered_map; | |
inline void | |
split_chunks(unsigned num_workers, string_view& data, vector<br::Worker>& workers, vector<br::MapHeap>& map_heap) { | |
auto const chunk_size = data.size() / num_workers; | |
cout << "chunk size: " << chunk_size * 1E-6 << " MB\n"; | |
auto start = 0UL; | |
for (auto id = 0U; id < num_workers; ++id) { | |
auto size = chunk_size; | |
while (data[start + size] != '\n') ++size; //divide at newline | |
auto chunk = data.substr(start, size); | |
start += size + 1; //start at next row | |
workers.emplace_back(chunk, map_heap[id]); | |
} | |
} | |
inline void | |
launch_workers(unsigned num_workers, vector<br::Worker>& workers) { | |
cout << "launching " << num_workers << " worker threads...\n"; | |
auto threads = std::vector<std::jthread>{}; | |
threads.reserve(num_workers); | |
for (auto id = 0U; id < num_workers; ++id) { | |
threads.emplace_back(&br::Worker::run, &workers[id]); | |
} | |
} | |
inline auto | |
collect_result(vector<br::Worker>& workers) -> unordered_map<string_view, br::Aggregation> { | |
cout << "collecting results\n------\n"; | |
auto result = unordered_map<string_view, br::Aggregation>{}; | |
for (auto&& w: workers) { | |
for (auto&& [station, aggr]: w.data) { | |
result[station] += aggr; | |
} | |
} | |
return result; | |
} | |
inline void | |
sort_print(unordered_map<string_view, br::Aggregation>& result) { | |
auto sorted = vector<pair<string_view, br::Aggregation>>{result.begin(), result.end()}; | |
std::sort(sorted.begin(), sorted.end(), [](auto&& lhs, auto&& rhs) { | |
return lhs.first < rhs.first; | |
}); | |
for (auto&& [station, aggr]: sorted) { | |
cout << station << ": " << aggr << "\n"; | |
} | |
} | |
int main(int argc, char** argv) { | |
auto filename = rm::getFilename(argc, argv); | |
cout << "filename: " << filename << "\n"; | |
rm::elapsed([filename]() { | |
auto file = io::MemoryMappedFile{filename}; | |
auto data = file.data(); | |
cout << "loaded " << data.size() * 1E-6 << " MB\n"; | |
auto const T = std::thread::hardware_concurrency(); | |
auto mapHeap = vector<br::MapHeap>(T); | |
auto workers = vector<br::Worker>{}; | |
workers.reserve(T); | |
split_chunks(T, data, workers, mapHeap); | |
launch_workers(T, workers); | |
auto result = collect_result(workers); | |
sort_print(result); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <ostream> | |
#include <string_view> | |
#include "fast_double_parser.h" | |
namespace ribomation::brc { | |
using std::string_view; | |
using std::ostream; | |
struct Measurement { | |
string_view station{}; | |
double temperature{}; | |
}; | |
inline auto extract(string_view chunk, unsigned long& start) -> Measurement { | |
auto semi_colon = chunk.find(';', start); | |
auto station_size = semi_colon - start; | |
auto station = chunk.substr(start, station_size); | |
start += station_size + 1; | |
auto nl = chunk.find('\n', start); | |
auto temp_text = chunk.substr(start, nl - start); | |
start += temp_text.size() + 1; | |
double temperature{}; | |
[[maybe_unused]] auto ptr = fast_double_parser::parse_number(temp_text.data(), &temperature); | |
return {station, temperature}; | |
} | |
inline auto operator<<(ostream& os, Measurement const& m) -> ostream& { | |
return os << "Measurement{" << m.station << ", " << m.temperature << "}"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <string> | |
#include <chrono> | |
#include "util.hxx" | |
namespace ribomation::util { | |
using namespace std::string_literals; | |
namespace cr = std::chrono; | |
using std::string; | |
void elapsed(std::function<void()> const& stmts) { | |
auto startTime = cr::high_resolution_clock::now(); | |
stmts(); | |
auto endTime = cr::high_resolution_clock::now(); | |
auto elapsed = cr::duration<double, std::ratio<1, 1>>{endTime - startTime}; | |
std::cerr << "------\n" << std::format("Elapsed time: {:.3f} seconds\n", elapsed.count()); | |
} | |
auto getFilename(int argc, char** argv) -> string { | |
auto filename = "data/weather-data-1M.csv"s; | |
for (auto k = 1; k < argc; ++k) { | |
auto arg = string{argv[k]}; | |
if (arg == "-f"s) { | |
filename = argv[++k]; | |
} else { | |
std::cerr << "usage: " << argv[0] << " [-f <str>]\n"; | |
throw std::invalid_argument{"usage"}; | |
} | |
} | |
return filename; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <functional> | |
#include <string> | |
namespace ribomation::util { | |
void elapsed(const std::function<void()>& stmts); | |
auto getFilename(int argc, char** argv) -> std::string; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <iostream> | |
#include <string_view> | |
#include <array> | |
#include <memory_resource> | |
#include <unordered_map> | |
#include "measurement.hxx" | |
#include "aggregation.hxx" | |
namespace ribomation::brc { | |
namespace br = ribomation::brc; | |
namespace pm = std::pmr; | |
using std::string_view; | |
using std::array; | |
struct MapHeap { | |
array<unsigned char, 100'000> storage{}; | |
pm::monotonic_buffer_resource buffer{storage.data(), storage.size(), pm::null_memory_resource()}; | |
pm::unsynchronized_pool_resource heap{&buffer}; | |
}; | |
struct Worker { | |
string_view chunk; | |
pm::unordered_map<string_view, br::Aggregation> data; | |
Worker(string_view chunk_, MapHeap& mapHeap) : chunk{chunk_}, data{&mapHeap.heap} { | |
data.reserve(500); | |
} | |
void run() { | |
try { | |
auto start = 0UL; | |
while (start < chunk.size()) { | |
auto m = br::extract(chunk, start); | |
data[m.station] += m.temperature; | |
} | |
} catch (std::exception const& x) { | |
std::cerr << "[WORKER] err: " << x.what() << "\n"; | |
} | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment