|
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
// llil4emh_buf.cc (using string_cnt_ub.h) |
|
// An emhash6::HashMap demonstration. |
|
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e216163 |
|
// |
|
// April 25, 2024 |
|
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482 |
|
// Original challenge https://perlmonks.com/?node_id=11147822 |
|
// and summary https://perlmonks.com/?node_id=11150293 |
|
// Other demonstrations https://perlmonks.com/?node_id=11149907 |
|
// |
|
// Authors |
|
// Mario Roy - C++ demonstration with parallel capabilities |
|
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com |
|
// Gregory Popovitch - Further changes and simplification |
|
// |
|
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book |
|
// |
|
// Obtain the emhash hashmap library (required dependency): |
|
// git clone --depth=1 https://github.com/ktprime/emhash |
|
// |
|
// Compile on Linux (clang++ or g++): |
|
// clang++ -o llil4emh_buf -std=c++20 -fopenmp -Wall -O3 llil4emh_buf.cc |
|
// |
|
// On macOS, use g++-12 from https://brew.sh (installation: brew install gcc@12). |
|
// The g++ command also works with mingw C++ compiler (https://sourceforge.net/projects/mingw-w64) |
|
// that comes bundled with Strawberry Perl (C:\Strawberry\c\bin\g++.exe). |
|
// |
|
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/?node_id=11148681 |
|
// perl gen-llil.pl big1.txt 200 3 1 |
|
// perl gen-llil.pl big2.txt 200 3 1 |
|
// perl gen-llil.pl big3.txt 200 3 1 |
|
// |
|
// To make random input, obtain shuffle.pl from https://perlmonks.com/?node_id=11149800 |
|
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt |
|
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt |
|
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt |
|
// |
|
// Example run: llil4emh big1.txt big2.txt big3.txt >out.txt |
|
// NUM_THREADS=3 llil4emh ... |
|
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
|
|
#include <cassert> |
|
#include <cstdio> |
|
#include <cstddef> |
|
#include <cstdint> |
|
#include <cstdlib> |
|
#include <cstring> |
|
#include <ctime> |
|
#include <compare> |
|
#include <chrono> |
|
|
|
#include <string> |
|
#include <string_view> |
|
|
|
#include <array> |
|
#include <vector> |
|
|
|
#include <thread> |
|
#include <execution> |
|
#include <atomic> |
|
|
|
#include <iomanip> |
|
#include <iostream> |
|
#include <fstream> |
|
|
|
#if defined(__GNUC__) && !defined(__clang__) |
|
#pragma GCC diagnostic push |
|
#pragma GCC diagnostic ignored "-Wunused-but-set-variable" |
|
#pragma GCC diagnostic ignored "-Wclass-memaccess" |
|
#include "emhash/hash_table6.hpp" |
|
#pragma GCC diagnostic pop |
|
#else |
|
#include "emhash/hash_table6.hpp" |
|
#endif |
|
|
|
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, need a 64-bit compile"); |
|
|
|
// Specify 0/1 to use boost's parallel sorting algorithm; faster than __gnu_parallel::sort. |
|
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/parallel.html |
|
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_indirect_sort_en.pdf |
|
// This requires the boost header files: e.g. devpkg-boost bundle on Clear Linux. |
|
// Note: Another option is downloading and unpacking Boost locally. |
|
// (no need to build it because the bits we use are header file only) |
|
#define USE_BOOST_PARALLEL_SORT 1 |
|
|
|
#if USE_BOOST_PARALLEL_SORT |
|
#ifdef __clang__ |
|
#pragma clang diagnostic push |
|
#pragma clang diagnostic ignored "-Wunused-parameter" |
|
#pragma clang diagnostic ignored "-Wshadow" |
|
#include <boost/sort/sort.hpp> |
|
#pragma clang diagnostic pop |
|
#else |
|
#include <boost/sort/sort.hpp> |
|
#endif |
|
#endif |
|
|
|
#ifdef _OPENMP |
|
#include <omp.h> |
|
#endif |
|
|
|
class spinlock_mutex { |
|
// https://rigtorp.se/spinlock/ |
|
// https://vorbrodt.blog/2019/02/12/fast-mutex/ |
|
public: |
|
// Assignment is disabled. |
|
spinlock_mutex& operator=(const spinlock_mutex& rhs) = delete; |
|
|
|
void lock() noexcept { |
|
for (;;) { |
|
if (!lock_.exchange(true, std::memory_order_acquire)) |
|
break; |
|
while (lock_.load(std::memory_order_relaxed)) |
|
__builtin_ia32_pause(); |
|
} |
|
} |
|
|
|
void unlock() noexcept { |
|
lock_.store(false, std::memory_order_release); |
|
} |
|
|
|
private: |
|
alignas(4 * sizeof(std::max_align_t)) std::atomic_bool lock_ = false; |
|
}; |
|
|
|
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
|
|
typedef uint32_t int_type; |
|
|
|
#include "string_cnt_ub.h" // union std::array plus buffer allocation |
|
|
|
inline constexpr size_t MAX_THREADS = 64; |
|
inline constexpr size_t SSO_LENGTH = 12; |
|
|
|
using string_cnt = string_cnt_t<SSO_LENGTH, MAX_THREADS>; |
|
|
|
namespace std { |
|
template<> struct hash<string_cnt> { |
|
std::size_t operator()(const string_cnt& v) const noexcept { return v.hash(); }; |
|
}; |
|
} |
|
|
|
using vec_str_int_type = std::vector<string_cnt>; |
|
using map_str_int_type = emhash6::HashMap<string_cnt, uint8_t>; // value unused |
|
|
|
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
|
|
// convert positive number from string to uint32_t |
|
inline uint32_t fast_atoll64(const char* str) |
|
{ |
|
uint32_t val = 0; |
|
uint8_t digit; |
|
while ((digit = uint8_t(*str++ - '0')) <= 9) |
|
val = val * 10 + digit; |
|
return val; |
|
} |
|
|
|
// Helper function to find a character. |
|
inline char* find_char(char* first, char* last, char c) |
|
{ |
|
while (first != last) { |
|
if (*first == c) break; |
|
++first; |
|
} |
|
return first; |
|
} |
|
|
|
// Limit chunk size and line length. |
|
inline constexpr size_t CHUNK_SIZE = 32768; |
|
inline constexpr size_t MAX_LINE_LEN = 255; |
|
inline constexpr size_t NUM_MAPS = 1 << 12; |
|
|
|
static int64_t get_properties( |
|
const char* fname, // in : the input file name |
|
const int nthds, // in : the number of threads |
|
auto& L, // in : the locks array |
|
auto& M) // inout : the maps array |
|
{ |
|
int64_t num_lines = 0; |
|
std::ifstream fin(fname, std::ifstream::binary); |
|
if (!fin.is_open()) { |
|
std::cerr << "Error opening '" << fname << "' : " << strerror(errno) << '\n'; |
|
return num_lines; |
|
} |
|
|
|
#pragma omp parallel reduction(+:num_lines) |
|
{ |
|
std::string buf; |
|
buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0'); |
|
int tid = omp_get_thread_num(); |
|
|
|
while (fin.good()) { |
|
size_t len = 0; |
|
|
|
// Read the next chunk serially. |
|
#pragma omp critical |
|
{ |
|
fin.read(&buf[0], CHUNK_SIZE); |
|
if ((len = fin.gcount()) > 0) { |
|
if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_LINE_LEN)) { |
|
// Getline discards the newline char and appends null char. |
|
// Therefore, change '\0' to '\n'. |
|
len += fin.gcount(); |
|
buf[len - 1] = '\n'; |
|
} |
|
} |
|
} |
|
|
|
if (!len) |
|
break; |
|
|
|
buf[len] = '\0'; |
|
char *first = &buf[0]; |
|
char *last = &buf[len]; |
|
|
|
// Process max Nthreads chunks concurrently. |
|
while (first < last) { |
|
char* beg_ptr{first}; |
|
char* end_ptr{find_char(first, last, '\n')}; |
|
char* found = find_char(beg_ptr, end_ptr, '\t'); |
|
|
|
first = end_ptr + 1; |
|
if (found == end_ptr) |
|
continue; |
|
|
|
assert(*found == '\t'); |
|
int_type count = fast_atoll64(found + 1); |
|
string_cnt s{tid, beg_ptr, (size_t)(found - beg_ptr), count}; |
|
|
|
if (nthds == 1) { |
|
auto [it, success] = M[0].try_emplace(std::move(s), 0); |
|
if (!success) { |
|
((string_cnt&)(it->first)).incr(count); |
|
s.mem_rollback(tid); |
|
} |
|
} |
|
else { |
|
size_t hv = s.hash(); |
|
size_t idx = ( ((hv & 0x000000000000ffffULL) << 16) | |
|
((hv & 0x00000000ffff0000ULL) >> 16) ) % NUM_MAPS; |
|
|
|
L[idx].lock(); |
|
auto [it, success] = M[idx].try_emplace(std::move(s), 0); |
|
if (!success) { |
|
((string_cnt&)(it->first)).incr(count); |
|
s.mem_rollback(tid); |
|
} |
|
L[idx].unlock(); |
|
} |
|
|
|
++num_lines; |
|
} |
|
} |
|
} |
|
|
|
fin.close(); |
|
// std::cerr << "getprops done\n"; |
|
return num_lines; |
|
} |
|
|
|
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
|
|
size_t divide_up(size_t dividend, size_t divisor) |
|
{ |
|
if (dividend % divisor) |
|
return (size_t)(dividend / divisor) + 1; |
|
else |
|
return (size_t)(dividend / divisor); |
|
} |
|
|
|
static void out_properties( |
|
const int nthds, // in : the number of threads |
|
vec_str_int_type& vec) // in : the vector to output |
|
{ |
|
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE); |
|
|
|
#ifdef _OPENMP |
|
int nthds_out = std::min(nthds, 32); |
|
#pragma omp parallel for ordered schedule(static, 1) num_threads(nthds_out) |
|
#endif |
|
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) { |
|
std::string str(""); str.reserve(2048 * 1024); |
|
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE; |
|
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_SIZE); |
|
|
|
for (; it != it2; ++it) { |
|
str.append(it->getstr()); |
|
str.append("\t", 1); |
|
str.append(std::to_string(it->getcnt())); |
|
str.append("\n", 1); |
|
} |
|
|
|
#pragma omp ordered |
|
std::cout << str << std::flush; |
|
} |
|
} |
|
|
|
typedef std::chrono::high_resolution_clock high_resolution_clock; |
|
typedef std::chrono::high_resolution_clock::time_point time_point; |
|
typedef std::chrono::milliseconds milliseconds; |
|
|
|
double elaspe_time(time_point cend, time_point cstart) { |
|
return double ( |
|
std::chrono::duration_cast<milliseconds>(cend - cstart).count() |
|
) * 1e-3; |
|
} |
|
|
|
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
|
|
int main(int argc, char* argv[]) |
|
{ |
|
if (argc < 2) { |
|
if (argc > 0) |
|
std::cerr << "usage: llil4emh file1 file2 ... >out.txt\n"; |
|
return 1; |
|
} |
|
|
|
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed); |
|
std::cerr << "llil4emh start\n"; |
|
#ifdef _OPENMP |
|
std::cerr << "use OpenMP\n"; |
|
#else |
|
std::cerr << "don't use OpenMP\n"; |
|
#endif |
|
#if USE_BOOST_PARALLEL_SORT == 0 |
|
std::cerr << "don't use boost sort\n"; |
|
#else |
|
std::cerr << "use boost sort\n"; |
|
#endif |
|
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3; |
|
cstart1 = high_resolution_clock::now(); |
|
|
|
#ifdef _OPENMP |
|
// Determine the number of threads. |
|
const char* env_nthds = std::getenv("NUM_THREADS"); |
|
int nthds = ( env_nthds && strlen(env_nthds) ) |
|
? ::atoi(env_nthds) |
|
: std::thread::hardware_concurrency(); |
|
nthds = std::min((size_t)nthds, MAX_THREADS); |
|
omp_set_dynamic(false); |
|
omp_set_num_threads(nthds); |
|
omp_set_max_active_levels(1); |
|
int nthds_move = std::min(nthds, 16); |
|
#else |
|
int nthds = 1; |
|
int nthds_move = 1; |
|
#endif |
|
|
|
// Get the list of input files from the command line |
|
int nfiles = argc - 1; |
|
char** fname = &argv[1]; |
|
|
|
// Store the properties into a vector |
|
vec_str_int_type propvec; |
|
int64_t num_lines = 0; |
|
int64_t num_keys = 0; |
|
|
|
{ |
|
// Enclose shared vars L and M inside a block, for running parallel. |
|
// So GC releases the objects immediately after exiting the scope. |
|
spinlock_mutex L[NUM_MAPS]; |
|
map_str_int_type M[NUM_MAPS]; |
|
|
|
for (int i = 0; i < nfiles; ++i) |
|
num_lines += get_properties(fname[i], nthds, L, M); |
|
|
|
for (size_t i = 0; i < NUM_MAPS; ++i) |
|
num_keys += M[i].size(); |
|
|
|
cend1 = high_resolution_clock::now(); |
|
double ctaken1 = elaspe_time(cend1, cstart1); |
|
std::cerr << "get properties " << std::setw(8) << ctaken1 << " secs\n"; |
|
|
|
if (num_keys == 0) { |
|
std::cerr << "No work, exiting...\n"; |
|
return 1; |
|
} |
|
|
|
cstart2 = high_resolution_clock::now(); |
|
|
|
if (nthds == 1) { |
|
propvec.reserve(num_keys); |
|
for (auto const& x : M[0]) |
|
propvec.emplace_back(std::move(const_cast<string_cnt&>(x.first))); |
|
|
|
M[0].clear(); |
|
} |
|
else { |
|
propvec.resize(num_keys); |
|
std::array<vec_str_int_type::iterator, NUM_MAPS> I; |
|
I[0] = propvec.begin(); |
|
|
|
for (size_t i = 1; i < NUM_MAPS; ++i) |
|
I[i] = I[i-1] + M[i-1].size(); |
|
|
|
#pragma omp parallel for schedule(static, 1) num_threads(nthds_move) |
|
for (size_t i = 0; i < NUM_MAPS; ++i) { |
|
auto it = I[i]; |
|
for (auto const& x : M[i]) |
|
*it++ = std::move(const_cast<string_cnt&>(x.first)); |
|
|
|
map_str_int_type().swap(M[i]); // swap vs. clear to reclaim mem early |
|
} |
|
} |
|
|
|
cend2 = high_resolution_clock::now(); |
|
double ctaken2 = elaspe_time(cend2, cstart2); |
|
std::cerr << "map to vector " << std::setw(8) << ctaken2 << " secs\n"; |
|
} |
|
|
|
cstart3 = high_resolution_clock::now(); |
|
|
|
// Sort the vector by (count) in reverse order, (name) in lexical order |
|
auto reverse_order = [](const string_cnt& left, const string_cnt& right) { |
|
int_type left_cnt = left.getcnt(), right_cnt = right.getcnt(); |
|
return left_cnt != right_cnt ? left_cnt > right_cnt : left < right; |
|
}; |
|
#if USE_BOOST_PARALLEL_SORT == 0 |
|
// Standard sort |
|
std::sort(propvec.begin(), propvec.end(), reverse_order); |
|
#else |
|
// Parallel sort |
|
boost::sort::block_indirect_sort( |
|
propvec.begin(), propvec.end(), reverse_order, |
|
nthds |
|
); |
|
#endif |
|
|
|
cend3s = high_resolution_clock::now(); |
|
|
|
// Output the sorted vector |
|
out_properties(nthds, propvec); |
|
cend3 = high_resolution_clock::now(); |
|
|
|
double ctaken = elaspe_time(cend3, cstart1); |
|
double ctaken3s = elaspe_time(cend3s, cstart3); |
|
double ctaken3o = elaspe_time(cend3, cend3s); |
|
|
|
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << " secs\n"; |
|
std::cerr << "write stdout " << std::setw(8) << ctaken3o << " secs\n"; |
|
std::cerr << "total time " << std::setw(8) << ctaken << " secs\n"; |
|
std::cerr << " count lines " << num_lines << "\n"; |
|
std::cerr << " count unique " << propvec.size() << "\n"; |
|
|
|
// Free dynamically allocated memory in parallel. |
|
#pragma omp parallel for schedule(static, 200000) num_threads(nthds) |
|
for (size_t i = 0; i < propvec.size(); i++) |
|
propvec[i].free(); |
|
|
|
// Hack to see Private Bytes in Windows Task Manager |
|
// (uncomment next line so process doesn't exit too quickly) |
|
// std::this_thread::sleep_for(milliseconds(9000)); |
|
|
|
return 0; |
|
} |