Skip to content

Instantly share code, notes, and snippets.

@ribomation
Created January 18, 2024 12:03
Show Gist options
  • Save ribomation/69a7f59a5b18ef8d7c8477b0ce2108fb to your computer and use it in GitHub Desktop.
Save ribomation/69a7f59a5b18ef8d7c8477b0ce2108fb to your computer and use it in GitHub Desktop.
Ambitious optimization of 1BRC using Modern C++
#pragma once
#include <ostream>
#include <algorithm>
#include <limits>
#include <format>
namespace ribomation::brc {
struct Aggregation {
unsigned count = 0;
double sum = 0;
double min = std::numeric_limits<double>::max();
double max = std::numeric_limits<double>::min();
void operator+=(double t) {
++count;
sum += t;
min = std::min(min, t);
max = std::max(max, t);
}
void operator+=(Aggregation const& a) {
count += a.count;
sum += a.sum;
min = std::min(min, a.min);
max = std::max(max, a.max);
}
};
inline auto operator<<(std::ostream& os, Aggregation const& a) -> std::ostream& {
return os << std::format("{:+.2f}C, {:+.1f}/{:+.1f} ({:Ld})",
(a.sum / a.count), a.min, a.max, a.count);
}
}
#include <iostream>
#include <unordered_map>
#include <vector>
#include <thread>
#include "util.hxx"
#include "memory-mapped-file.hxx"
#include "aggregation.hxx"
#include "worker.hxx"
namespace rm = ribomation::util;
namespace io = ribomation::io;
namespace br = ribomation::brc;
using std::cout;
using std::string_view;
using std::pair;
using std::vector;
using std::unordered_map;
inline void
split_chunks(unsigned num_workers, string_view& data, vector<br::Worker>& workers, vector<br::MapHeap>& map_heap) {
auto const chunk_size = data.size() / num_workers;
cout << "chunk size: " << chunk_size * 1E-6 << " MB\n";
auto start = 0UL;
for (auto id = 0U; id < num_workers; ++id) {
auto size = chunk_size;
while (data[start + size] != '\n') ++size; //divide at newline
auto chunk = data.substr(start, size);
start += size + 1; //start at next row
workers.emplace_back(chunk, map_heap[id]);
}
}
inline void
launch_workers(unsigned num_workers, vector<br::Worker>& workers) {
cout << "launching " << num_workers << " worker threads...\n";
auto threads = std::vector<std::jthread>{};
threads.reserve(num_workers);
for (auto id = 0U; id < num_workers; ++id) {
threads.emplace_back(&br::Worker::run, &workers[id]);
}
}
inline auto
collect_result(vector<br::Worker>& workers) -> unordered_map<string_view, br::Aggregation> {
cout << "collecting results\n------\n";
auto result = unordered_map<string_view, br::Aggregation>{};
for (auto&& w: workers) {
for (auto&& [station, aggr]: w.data) {
result[station] += aggr;
}
}
return result;
}
inline void
sort_print(unordered_map<string_view, br::Aggregation>& result) {
auto sorted = vector<pair<string_view, br::Aggregation>>{result.begin(), result.end()};
std::sort(sorted.begin(), sorted.end(), [](auto&& lhs, auto&& rhs) {
return lhs.first < rhs.first;
});
for (auto&& [station, aggr]: sorted) {
cout << station << ": " << aggr << "\n";
}
}
int main(int argc, char** argv) {
auto filename = rm::getFilename(argc, argv);
cout << "filename: " << filename << "\n";
rm::elapsed([filename]() {
auto file = io::MemoryMappedFile{filename};
auto data = file.data();
cout << "loaded " << data.size() * 1E-6 << " MB\n";
auto const T = std::thread::hardware_concurrency();
auto mapHeap = vector<br::MapHeap>(T);
auto workers = vector<br::Worker>{};
workers.reserve(T);
split_chunks(T, data, workers, mapHeap);
launch_workers(T, workers);
auto result = collect_result(workers);
sort_print(result);
});
}
#pragma once
#include <ostream>
#include <string_view>
#include "fast_double_parser.h"
namespace ribomation::brc {
using std::string_view;
using std::ostream;
struct Measurement {
string_view station{};
double temperature{};
};
inline auto extract(string_view chunk, unsigned long& start) -> Measurement {
auto semi_colon = chunk.find(';', start);
auto station_size = semi_colon - start;
auto station = chunk.substr(start, station_size);
start += station_size + 1;
auto nl = chunk.find('\n', start);
auto temp_text = chunk.substr(start, nl - start);
start += temp_text.size() + 1;
double temperature{};
[[maybe_unused]] auto ptr = fast_double_parser::parse_number(temp_text.data(), &temperature);
return {station, temperature};
}
inline auto operator<<(ostream& os, Measurement const& m) -> ostream& {
return os << "Measurement{" << m.station << ", " << m.temperature << "}";
}
}
#include <iostream>
#include <string>
#include <chrono>
#include "util.hxx"
namespace ribomation::util {
using namespace std::string_literals;
namespace cr = std::chrono;
using std::string;
void elapsed(std::function<void()> const& stmts) {
auto startTime = cr::high_resolution_clock::now();
stmts();
auto endTime = cr::high_resolution_clock::now();
auto elapsed = cr::duration<double, std::ratio<1, 1>>{endTime - startTime};
std::cerr << "------\n" << std::format("Elapsed time: {:.3f} seconds\n", elapsed.count());
}
auto getFilename(int argc, char** argv) -> string {
auto filename = "data/weather-data-1M.csv"s;
for (auto k = 1; k < argc; ++k) {
auto arg = string{argv[k]};
if (arg == "-f"s) {
filename = argv[++k];
} else {
std::cerr << "usage: " << argv[0] << " [-f <str>]\n";
throw std::invalid_argument{"usage"};
}
}
return filename;
}
}
#pragma once
#include <functional>
#include <string>
namespace ribomation::util {
void elapsed(const std::function<void()>& stmts);
auto getFilename(int argc, char** argv) -> std::string;
}
#pragma once
#include <iostream>
#include <string_view>
#include <array>
#include <memory_resource>
#include <unordered_map>
#include "measurement.hxx"
#include "aggregation.hxx"
namespace ribomation::brc {
namespace br = ribomation::brc;
namespace pm = std::pmr;
using std::string_view;
using std::array;
struct MapHeap {
array<unsigned char, 100'000> storage{};
pm::monotonic_buffer_resource buffer{storage.data(), storage.size(), pm::null_memory_resource()};
pm::unsynchronized_pool_resource heap{&buffer};
};
struct Worker {
string_view chunk;
pm::unordered_map<string_view, br::Aggregation> data;
Worker(string_view chunk_, MapHeap& mapHeap) : chunk{chunk_}, data{&mapHeap.heap} {
data.reserve(500);
}
void run() {
try {
auto start = 0UL;
while (start < chunk.size()) {
auto m = br::extract(chunk, start);
data[m.station] += m.temperature;
}
} catch (std::exception const& x) {
std::cerr << "[WORKER] err: " << x.what() << "\n";
}
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment