Skip to content

Instantly share code, notes, and snippets.

@marioroy
Last active April 26, 2024 05:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marioroy/d02881b96b20fa1adde4388b3e216163 to your computer and use it in GitHub Desktop.
Save marioroy/d02881b96b20fa1adde4388b3e216163 to your computer and use it in GitHub Desktop.
C++ map, set, and vector demonstrations using string_cnt_ub.h (dynamic buffer allocation)

This gist contains five C++ programs, a single header file, and utilities for the Long List is Long (LLiL) challenge.

  1. llil4emh_buf.cc - emhash6::HashMap demonstration
  2. llil4hmap_buf.cc - phmap::flat_hash_set demonstration
  3. llil4map_buf.cc - phmap::parallel_flat_hash_set demonstration
  4. llil4umap_buf.cc - std::unordered_set demonstration
  5. llil4vec_buf.cc - similarly, a std::vector demonstration
  6. string_cnt_ub.h - union std::array with dynamic buffer allocation
  7. gen-files-big.sh - generate 92 big files (total 2.8 GB)
  8. gen-files-long.sh - generate 92 long files (total 5.0 GB)
  9. gen-llil.pl - generate LLiL test file
  10. shuffle.pl - shuffle LLiL test file

Obtain dependencies.

git clone --depth=1 https://github.com/ktprime/emhash
git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap

Build with clang++ (preferably), or g++.

clang++ -o llil4emh -std=c++20 -fopenmp -Wall -O3 llil4emh_buf.cc
clang++ -o llil4hmap -std=c++20 -fopenmp -Wall -O3 llil4hmap_buf.cc -I./parallel-hashmap
clang++ -o llil4map -std=c++20 -fopenmp -Wall -O3 llil4map_buf.cc -I./parallel-hashmap
clang++ -o llil4umap -std=c++20 -fopenmp -Wall -O3 llil4umap_buf.cc
clang++ -o llil4vec -std=c++20 -fopenmp -Wall -O3 llil4vec_buf.cc

Generate input files. Ensure adequate disk space.

bash gen-files-big.sh   # 2.8 GB
bash gen-files-long.sh  # 5.0 GB, optional

Running.

cd tmp

# Note: ~ 16.0 GB memory to process long
../llil4map big* big* big* | cksum
../llil4map long* long* long* | cksum

# Note: ~ 29.2 GB memory to process long
../llil4vec big* big* big* | cksum
../llil4vec long* long* long* | cksum

# Append suffix for lesser memory consumption
../llil4map longa* longa* longa* | cksum
../llil4vec longa* longa* longa* | cksum
#!/bin/bash
# Script for generating input files for llil4map.cc and llil4vec.cc.
# 2.8 GB disk utilization, 31 MB per file.
# Usage: bash gen-files-big.sh
mkdir -p tmp
cp gen-llil.pl shuffle.pl tmp
cd tmp
# create 26 random files
for n in $(perl -le "print for 'aa'..'az'"); do
perl gen-llil.pl big$n 200 3 1
perl shuffle.pl big$n >1; mv 1 big$n
done &
# create 26 random files
for n in $(perl -le "print for 'ba'..'bz'"); do
perl gen-llil.pl big$n 200 3 1
perl shuffle.pl big$n >2; mv 2 big$n
done &
# create 26 random files
for n in $(perl -le "print for 'ca'..'cz'"); do
perl gen-llil.pl big$n 200 3 1
perl shuffle.pl big$n >3; mv 3 big$n
done &
# create 14 random files (total 92 files)
for n in $(perl -le "print for 'da'..'dn'"); do
perl gen-llil.pl big$n 200 3 1
perl shuffle.pl big$n >4; mv 4 big$n
done &
cd ..
wait
#!/bin/bash
# Script for generating input files for llil4map.cc and llil4vec.cc.
# Mixed sizes, alternating 74 MB and 31 MB. 5.0 GB disk utilization.
# Usage: bash gen-files-long.sh
mkdir -p tmp
cp gen-long-llil.pl shuffle.pl tmp
cd tmp
# create 26 random files
for n in $(perl -le "print for 'aa'..'az'"); do
perl gen-llil.pl long$n 200 16 1
perl shuffle.pl long$n >1; mv 1 long$n
done &
# create 26 random files
for n in $(perl -le "print for 'ba'..'bz'"); do
perl gen-llil.pl long$n 200 3 1
perl shuffle.pl long$n >2; mv 2 long$n
done &
# create 26 random files
for n in $(perl -le "print for 'ca'..'cz'"); do
perl gen-llil.pl long$n 200 16 1
perl shuffle.pl long$n >3; mv 3 long$n
done &
# create 14 random files (total 92 files)
for n in $(perl -le "print for 'da'..'dn'"); do
perl gen-llil.pl long$n 200 3 1
perl shuffle.pl long$n >4; mv 4 long$n
done &
cd ..
wait
# gen-llil.pl
# Crude program to generate a big LLiL test file to use in benchmarks
# Author: eyepopslikeamosquito, https://perlmonks.com/?node_id=11148681
# On Windows running:
# perl gen-llil.pl big2.txt 200 3 - produces a test file with size = 35,152,000 bytes
# (lines terminated with "\r\n")
# perl gen-llil.pl big2.txt 200 3 1 - produces a test file with size = 31,636,800 bytes
# (lines terminated with "\n")
# On Unix, lines are terminated with "\n" and the file size is always 31,636,800 bytes
use strict;
use warnings;
use autodie;
{
my $ordmin = ord('a');
my $ordmax = ord('z') + 1;
# Generate a random word
sub gen_random_word {
my $word = shift; # word prefix
my $nchar = shift; # the number of random chars to append
for my $i (1 .. $nchar) {
$word .= chr( $ordmin + int( rand($ordmax - $ordmin) ) );
}
return $word;
}
}
sub create_test_file {
my $fname = shift;
my $count = shift;
my $wordlen = shift;
my $fbin = shift;
open( my $fh_out, '>', $fname );
$fbin and binmode($fh_out);
for my $c ( 'aaa' .. 'zzz' ) {
for my $i (1 .. $count) {
print {$fh_out} gen_random_word( $c, $wordlen ) . "\t" . 1 . "\n";
}
}
}
my $outfile = shift;
my $count = shift;
my $wordlen = shift;
my $fbin = shift; # default is to use text stream (not a binary stream)
defined($fbin) or $fbin = 0;
$outfile or die "usage: $0 outfile count wordlen\n";
$count or die "usage: $0 outfile count wordlen\n";
print "generating test file '$outfile' with count '$count' (binmode=$fbin)\n";
create_test_file($outfile, $count, $wordlen, $fbin);
print "file size=", -s $outfile, "\n";
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// llil4emh_buf.cc (using string_cnt_ub.h)
// An emhash6::HashMap demonstration.
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e216163
//
// April 25, 2024
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482
// Original challenge https://perlmonks.com/?node_id=11147822
// and summary https://perlmonks.com/?node_id=11150293
// Other demonstrations https://perlmonks.com/?node_id=11149907
//
// Authors
// Mario Roy - C++ demonstration with parallel capabilities
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com
// Gregory Popovitch - Further changes and simplification
//
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book
//
// Obtain the emhash hashmap library (required dependency):
// git clone --depth=1 https://github.com/ktprime/emhash
//
// Compile on Linux (clang++ or g++):
// clang++ -o llil4emh_buf -std=c++20 -fopenmp -Wall -O3 llil4emh_buf.cc
//
// On macOS, use g++-12 from https://brew.sh (installation: brew install gcc@12).
// The g++ command also works with mingw C++ compiler (https://sourceforge.net/projects/mingw-w64)
// that comes bundled with Strawberry Perl (C:\Strawberry\c\bin\g++.exe).
//
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/?node_id=11148681
// perl gen-llil.pl big1.txt 200 3 1
// perl gen-llil.pl big2.txt 200 3 1
// perl gen-llil.pl big3.txt 200 3 1
//
// To make random input, obtain shuffle.pl from https://perlmonks.com/?node_id=11149800
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt
//
// Example run: llil4emh big1.txt big2.txt big3.txt >out.txt
// NUM_THREADS=3 llil4emh ...
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <cassert>
#include <cstdio>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <compare>
#include <chrono>
#include <string>
#include <string_view>
#include <array>
#include <vector>
#include <thread>
#include <execution>
#include <atomic>
#include <iomanip>
#include <iostream>
#include <fstream>
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wclass-memaccess"
#include "emhash/hash_table6.hpp"
#pragma GCC diagnostic pop
#else
#include "emhash/hash_table6.hpp"
#endif
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, need a 64-bit compile");
// Specify 0/1 to use boost's parallel sorting algorithm; faster than __gnu_parallel::sort.
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/parallel.html
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_indirect_sort_en.pdf
// This requires the boost header files: e.g. devpkg-boost bundle on Clear Linux.
// Note: Another option is downloading and unpacking Boost locally.
// (no need to build it because the bits we use are header file only)
#define USE_BOOST_PARALLEL_SORT 1
#if USE_BOOST_PARALLEL_SORT
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wshadow"
#include <boost/sort/sort.hpp>
#pragma clang diagnostic pop
#else
#include <boost/sort/sort.hpp>
#endif
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
class spinlock_mutex {
// https://rigtorp.se/spinlock/
// https://vorbrodt.blog/2019/02/12/fast-mutex/
public:
// Assignment is disabled.
spinlock_mutex& operator=(const spinlock_mutex& rhs) = delete;
void lock() noexcept {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire))
break;
while (lock_.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
private:
alignas(4 * sizeof(std::max_align_t)) std::atomic_bool lock_ = false;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
typedef uint32_t int_type;
#include "string_cnt_ub.h" // union std::array plus buffer allocation
inline constexpr size_t MAX_THREADS = 64;
inline constexpr size_t SSO_LENGTH = 12;
using string_cnt = string_cnt_t<SSO_LENGTH, MAX_THREADS>;
namespace std {
template<> struct hash<string_cnt> {
std::size_t operator()(const string_cnt& v) const noexcept { return v.hash(); };
};
}
using vec_str_int_type = std::vector<string_cnt>;
using map_str_int_type = emhash6::HashMap<string_cnt, uint8_t>; // value unused
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// convert positive number from string to uint32_t
inline uint32_t fast_atoll64(const char* str)
{
uint32_t val = 0;
uint8_t digit;
while ((digit = uint8_t(*str++ - '0')) <= 9)
val = val * 10 + digit;
return val;
}
// Helper function to find a character.
inline char* find_char(char* first, char* last, char c)
{
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
// Limit chunk size and line length.
inline constexpr size_t CHUNK_SIZE = 32768;
inline constexpr size_t MAX_LINE_LEN = 255;
inline constexpr size_t NUM_MAPS = 1 << 12;
static int64_t get_properties(
const char* fname, // in : the input file name
const int nthds, // in : the number of threads
auto& L, // in : the locks array
auto& M) // inout : the maps array
{
int64_t num_lines = 0;
std::ifstream fin(fname, std::ifstream::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname << "' : " << strerror(errno) << '\n';
return num_lines;
}
#pragma omp parallel reduction(+:num_lines)
{
std::string buf;
buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
int tid = omp_get_thread_num();
while (fin.good()) {
size_t len = 0;
// Read the next chunk serially.
#pragma omp critical
{
fin.read(&buf[0], CHUNK_SIZE);
if ((len = fin.gcount()) > 0) {
if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_LINE_LEN)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += fin.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len)
break;
buf[len] = '\0';
char *first = &buf[0];
char *last = &buf[len];
// Process max Nthreads chunks concurrently.
while (first < last) {
char* beg_ptr{first};
char* end_ptr{find_char(first, last, '\n')};
char* found = find_char(beg_ptr, end_ptr, '\t');
first = end_ptr + 1;
if (found == end_ptr)
continue;
assert(*found == '\t');
int_type count = fast_atoll64(found + 1);
string_cnt s{tid, beg_ptr, (size_t)(found - beg_ptr), count};
if (nthds == 1) {
auto [it, success] = M[0].try_emplace(std::move(s), 0);
if (!success) {
((string_cnt&)(it->first)).incr(count);
s.mem_rollback(tid);
}
}
else {
size_t hv = s.hash();
size_t idx = ( ((hv & 0x000000000000ffffULL) << 16) |
((hv & 0x00000000ffff0000ULL) >> 16) ) % NUM_MAPS;
L[idx].lock();
auto [it, success] = M[idx].try_emplace(std::move(s), 0);
if (!success) {
((string_cnt&)(it->first)).incr(count);
s.mem_rollback(tid);
}
L[idx].unlock();
}
++num_lines;
}
}
}
fin.close();
// std::cerr << "getprops done\n";
return num_lines;
}
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
size_t divide_up(size_t dividend, size_t divisor)
{
if (dividend % divisor)
return (size_t)(dividend / divisor) + 1;
else
return (size_t)(dividend / divisor);
}
static void out_properties(
const int nthds, // in : the number of threads
vec_str_int_type& vec) // in : the vector to output
{
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE);
#ifdef _OPENMP
int nthds_out = std::min(nthds, 32);
#pragma omp parallel for ordered schedule(static, 1) num_threads(nthds_out)
#endif
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) {
std::string str(""); str.reserve(2048 * 1024);
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE;
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_SIZE);
for (; it != it2; ++it) {
str.append(it->getstr());
str.append("\t", 1);
str.append(std::to_string(it->getcnt()));
str.append("\n", 1);
}
#pragma omp ordered
std::cout << str << std::flush;
}
}
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "usage: llil4emh file1 file2 ... >out.txt\n";
return 1;
}
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
std::cerr << "llil4emh start\n";
#ifdef _OPENMP
std::cerr << "use OpenMP\n";
#else
std::cerr << "don't use OpenMP\n";
#endif
#if USE_BOOST_PARALLEL_SORT == 0
std::cerr << "don't use boost sort\n";
#else
std::cerr << "use boost sort\n";
#endif
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3;
cstart1 = high_resolution_clock::now();
#ifdef _OPENMP
// Determine the number of threads.
const char* env_nthds = std::getenv("NUM_THREADS");
int nthds = ( env_nthds && strlen(env_nthds) )
? ::atoi(env_nthds)
: std::thread::hardware_concurrency();
nthds = std::min((size_t)nthds, MAX_THREADS);
omp_set_dynamic(false);
omp_set_num_threads(nthds);
omp_set_max_active_levels(1);
int nthds_move = std::min(nthds, 16);
#else
int nthds = 1;
int nthds_move = 1;
#endif
// Get the list of input files from the command line
int nfiles = argc - 1;
char** fname = &argv[1];
// Store the properties into a vector
vec_str_int_type propvec;
int64_t num_lines = 0;
int64_t num_keys = 0;
{
// Enclose shared vars L and M inside a block, for running parallel.
// So GC releases the objects immediately after exiting the scope.
spinlock_mutex L[NUM_MAPS];
map_str_int_type M[NUM_MAPS];
for (int i = 0; i < nfiles; ++i)
num_lines += get_properties(fname[i], nthds, L, M);
for (size_t i = 0; i < NUM_MAPS; ++i)
num_keys += M[i].size();
cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "get properties " << std::setw(8) << ctaken1 << " secs\n";
if (num_keys == 0) {
std::cerr << "No work, exiting...\n";
return 1;
}
cstart2 = high_resolution_clock::now();
if (nthds == 1) {
propvec.reserve(num_keys);
for (auto const& x : M[0])
propvec.emplace_back(std::move(const_cast<string_cnt&>(x.first)));
M[0].clear();
}
else {
propvec.resize(num_keys);
std::array<vec_str_int_type::iterator, NUM_MAPS> I;
I[0] = propvec.begin();
for (size_t i = 1; i < NUM_MAPS; ++i)
I[i] = I[i-1] + M[i-1].size();
#pragma omp parallel for schedule(static, 1) num_threads(nthds_move)
for (size_t i = 0; i < NUM_MAPS; ++i) {
auto it = I[i];
for (auto const& x : M[i])
*it++ = std::move(const_cast<string_cnt&>(x.first));
map_str_int_type().swap(M[i]); // swap vs. clear to reclaim mem early
}
}
cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "map to vector " << std::setw(8) << ctaken2 << " secs\n";
}
cstart3 = high_resolution_clock::now();
// Sort the vector by (count) in reverse order, (name) in lexical order
auto reverse_order = [](const string_cnt& left, const string_cnt& right) {
int_type left_cnt = left.getcnt(), right_cnt = right.getcnt();
return left_cnt != right_cnt ? left_cnt > right_cnt : left < right;
};
#if USE_BOOST_PARALLEL_SORT == 0
// Standard sort
std::sort(propvec.begin(), propvec.end(), reverse_order);
#else
// Parallel sort
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order,
nthds
);
#endif
cend3s = high_resolution_clock::now();
// Output the sorted vector
out_properties(nthds, propvec);
cend3 = high_resolution_clock::now();
double ctaken = elaspe_time(cend3, cstart1);
double ctaken3s = elaspe_time(cend3s, cstart3);
double ctaken3o = elaspe_time(cend3, cend3s);
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << " secs\n";
std::cerr << "write stdout " << std::setw(8) << ctaken3o << " secs\n";
std::cerr << "total time " << std::setw(8) << ctaken << " secs\n";
std::cerr << " count lines " << num_lines << "\n";
std::cerr << " count unique " << propvec.size() << "\n";
// Free dynamically allocated memory in parallel.
#pragma omp parallel for schedule(static, 200000) num_threads(nthds)
for (size_t i = 0; i < propvec.size(); i++)
propvec[i].free();
// Hack to see Private Bytes in Windows Task Manager
// (uncomment next line so process doesn't exit too quickly)
// std::this_thread::sleep_for(milliseconds(9000));
return 0;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// llil4hmap_buf.cc (using string_cnt_ub.h)
// A phmap::flat_hash_set demonstration.
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e216163
//
// April 25, 2024
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482
// Original challenge https://perlmonks.com/?node_id=11147822
// and summary https://perlmonks.com/?node_id=11150293
// Other demonstrations https://perlmonks.com/?node_id=11149907
//
// Authors
// Mario Roy - C++ demonstration with parallel capabilities
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com
// Gregory Popovitch - Further changes and simplification
//
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book
//
// Obtain the parallel hashmap library (required dependency):
// git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap
//
// Compile on Linux (clang++ or g++):
// clang++ -o llil4hmap_buf -std=c++20 -fopenmp -Wall -O3 llil4hmap_buf.cc -I./parallel-hashmap
//
// On macOS, use g++-12 from https://brew.sh (installation: brew install gcc@12).
// The g++ command also works with mingw C++ compiler (https://sourceforge.net/projects/mingw-w64)
// that comes bundled with Strawberry Perl (C:\Strawberry\c\bin\g++.exe).
//
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/?node_id=11148681
// perl gen-llil.pl big1.txt 200 3 1
// perl gen-llil.pl big2.txt 200 3 1
// perl gen-llil.pl big3.txt 200 3 1
//
// To make random input, obtain shuffle.pl from https://perlmonks.com/?node_id=11149800
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt
//
// Example run: llil4hmap big1.txt big2.txt big3.txt >out.txt
// NUM_THREADS=3 llil4hmap ...
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <cassert>
#include <cstdio>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <compare>
#include <chrono>
#include <string>
#include <string_view>
#include <array>
#include <vector>
#include <thread>
#include <execution>
#include <atomic>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <parallel_hashmap/phmap.h>
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, need a 64-bit compile");
// Specify 0/1 to use boost's parallel sorting algorithm; faster than __gnu_parallel::sort.
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/parallel.html
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_indirect_sort_en.pdf
// This requires the boost header files: e.g. devpkg-boost bundle on Clear Linux.
// Note: Another option is downloading and unpacking Boost locally.
// (no need to build it because the bits we use are header file only)
#define USE_BOOST_PARALLEL_SORT 1
#if USE_BOOST_PARALLEL_SORT
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wshadow"
#include <boost/sort/sort.hpp>
#pragma clang diagnostic pop
#else
#include <boost/sort/sort.hpp>
#endif
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
class spinlock_mutex {
// https://rigtorp.se/spinlock/
// https://vorbrodt.blog/2019/02/12/fast-mutex/
public:
// Assignment is disabled.
spinlock_mutex& operator=(const spinlock_mutex& rhs) = delete;
void lock() noexcept {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire))
break;
while (lock_.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
private:
alignas(4 * sizeof(std::max_align_t)) std::atomic_bool lock_ = false;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
typedef uint32_t int_type;
#include "string_cnt_ub.h" // union std::array plus buffer allocation
inline constexpr size_t MAX_THREADS = 64;
inline constexpr size_t SSO_LENGTH = 12;
using string_cnt = string_cnt_t<SSO_LENGTH, MAX_THREADS>;
namespace std {
template<> struct hash<string_cnt> {
std::size_t operator()(const string_cnt& v) const noexcept { return v.hash(); };
};
}
using vec_str_int_type = std::vector<string_cnt>;
using map_str_int_type = phmap::flat_hash_set<string_cnt>;
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// convert positive number from string to uint32_t
inline uint32_t fast_atoll64(const char* str)
{
uint32_t val = 0;
uint8_t digit;
while ((digit = uint8_t(*str++ - '0')) <= 9)
val = val * 10 + digit;
return val;
}
// Helper function to find a character.
inline char* find_char(char* first, char* last, char c)
{
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
// Limit chunk size and line length.
inline constexpr size_t CHUNK_SIZE = 32768;
inline constexpr size_t MAX_LINE_LEN = 255;
inline constexpr size_t NUM_MAPS = 1 << 12;
static int64_t get_properties(
const char* fname, // in : the input file name
const int nthds, // in : the number of threads
auto& L, // in : the locks array
auto& M) // inout : the maps array
{
int64_t num_lines = 0;
std::ifstream fin(fname, std::ifstream::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname << "' : " << strerror(errno) << '\n';
return num_lines;
}
#pragma omp parallel reduction(+:num_lines)
{
std::string buf;
buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
int tid = omp_get_thread_num();
while (fin.good()) {
size_t len = 0;
// Read the next chunk serially.
#pragma omp critical
{
fin.read(&buf[0], CHUNK_SIZE);
if ((len = fin.gcount()) > 0) {
if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_LINE_LEN)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += fin.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len)
break;
buf[len] = '\0';
char *first = &buf[0];
char *last = &buf[len];
// Process max Nthreads chunks concurrently.
while (first < last) {
char* beg_ptr{first};
char* end_ptr{find_char(first, last, '\n')};
char* found = find_char(beg_ptr, end_ptr, '\t');
first = end_ptr + 1;
if (found == end_ptr)
continue;
assert(*found == '\t');
int_type count = fast_atoll64(found + 1);
string_cnt s{tid, beg_ptr, (size_t)(found - beg_ptr), count};
if (nthds == 1) {
auto [it, success] = M[0].emplace(std::move(s));
if (!success) {
((string_cnt&)(*it)).incr(count);
s.mem_rollback(tid);
}
}
else {
size_t hv = s.hash();
size_t idx = ( ((hv & 0x000000000000ffffULL) << 16) |
((hv & 0x00000000ffff0000ULL) >> 16) ) % NUM_MAPS;
L[idx].lock();
auto [it, success] = M[idx].emplace(std::move(s));
if (!success) {
((string_cnt&)(*it)).incr(count);
s.mem_rollback(tid);
}
L[idx].unlock();
}
++num_lines;
}
}
}
fin.close();
// std::cerr << "getprops done\n";
return num_lines;
}
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
size_t divide_up(size_t dividend, size_t divisor)
{
if (dividend % divisor)
return (size_t)(dividend / divisor) + 1;
else
return (size_t)(dividend / divisor);
}
static void out_properties(
const int nthds, // in : the number of threads
vec_str_int_type& vec) // in : the vector to output
{
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE);
#ifdef _OPENMP
int nthds_out = std::min(nthds, 32);
#pragma omp parallel for ordered schedule(static, 1) num_threads(nthds_out)
#endif
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) {
std::string str(""); str.reserve(2048 * 1024);
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE;
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_SIZE);
for (; it != it2; ++it) {
str.append(it->getstr());
str.append("\t", 1);
str.append(std::to_string(it->getcnt()));
str.append("\n", 1);
}
#pragma omp ordered
std::cout << str << std::flush;
}
}
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "usage: llil4hmap file1 file2 ... >out.txt\n";
return 1;
}
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
std::cerr << "llil4hmap start\n";
#ifdef _OPENMP
std::cerr << "use OpenMP\n";
#else
std::cerr << "don't use OpenMP\n";
#endif
#if USE_BOOST_PARALLEL_SORT == 0
std::cerr << "don't use boost sort\n";
#else
std::cerr << "use boost sort\n";
#endif
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3;
cstart1 = high_resolution_clock::now();
#ifdef _OPENMP
// Determine the number of threads.
const char* env_nthds = std::getenv("NUM_THREADS");
int nthds = ( env_nthds && strlen(env_nthds) )
? ::atoi(env_nthds)
: std::thread::hardware_concurrency();
nthds = std::min((size_t)nthds, MAX_THREADS);
omp_set_dynamic(false);
omp_set_num_threads(nthds);
omp_set_max_active_levels(1);
int nthds_move = std::min(nthds, 16);
#else
int nthds = 1;
int nthds_move = 1;
#endif
// Get the list of input files from the command line
int nfiles = argc - 1;
char** fname = &argv[1];
// Store the properties into a vector
vec_str_int_type propvec;
int64_t num_lines = 0;
int64_t num_keys = 0;
{
// Enclose shared vars L and M inside a block, for running parallel.
// So GC releases the objects immediately after exiting the scope.
spinlock_mutex L[NUM_MAPS];
map_str_int_type M[NUM_MAPS];
for (int i = 0; i < nfiles; ++i)
num_lines += get_properties(fname[i], nthds, L, M);
for (size_t i = 0; i < NUM_MAPS; ++i)
num_keys += M[i].size();
cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "get properties " << std::setw(8) << ctaken1 << " secs\n";
if (num_keys == 0) {
std::cerr << "No work, exiting...\n";
return 1;
}
cstart2 = high_resolution_clock::now();
if (nthds == 1) {
propvec.reserve(num_keys);
for (auto const& x : M[0])
propvec.emplace_back(std::move(const_cast<string_cnt&>(x)));
M[0].clear();
}
else {
propvec.resize(num_keys);
std::array<vec_str_int_type::iterator, NUM_MAPS> I;
I[0] = propvec.begin();
for (size_t i = 1; i < NUM_MAPS; ++i)
I[i] = I[i-1] + M[i-1].size();
#pragma omp parallel for schedule(static, 1) num_threads(nthds_move)
for (size_t i = 0; i < NUM_MAPS; ++i) {
auto it = I[i];
for (auto const& x : M[i])
*it++ = std::move(const_cast<string_cnt&>(x));
map_str_int_type().swap(M[i]); // swap vs. clear to reclaim mem early
}
}
cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "map to vector " << std::setw(8) << ctaken2 << " secs\n";
}
cstart3 = high_resolution_clock::now();
// Sort the vector by (count) in reverse order, (name) in lexical order
auto reverse_order = [](const string_cnt& left, const string_cnt& right) {
int_type left_cnt = left.getcnt(), right_cnt = right.getcnt();
return left_cnt != right_cnt ? left_cnt > right_cnt : left < right;
};
#if USE_BOOST_PARALLEL_SORT == 0
// Standard sort
std::sort(propvec.begin(), propvec.end(), reverse_order);
#else
// Parallel sort
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order,
nthds
);
#endif
cend3s = high_resolution_clock::now();
// Output the sorted vector
out_properties(nthds, propvec);
cend3 = high_resolution_clock::now();
double ctaken = elaspe_time(cend3, cstart1);
double ctaken3s = elaspe_time(cend3s, cstart3);
double ctaken3o = elaspe_time(cend3, cend3s);
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << " secs\n";
std::cerr << "write stdout " << std::setw(8) << ctaken3o << " secs\n";
std::cerr << "total time " << std::setw(8) << ctaken << " secs\n";
std::cerr << " count lines " << num_lines << "\n";
std::cerr << " count unique " << propvec.size() << "\n";
// Free dynamically allocated memory in parallel.
#pragma omp parallel for schedule(static, 200000) num_threads(nthds)
for (size_t i = 0; i < propvec.size(); i++)
propvec[i].free();
// Hack to see Private Bytes in Windows Task Manager
// (uncomment next line so process doesn't exit too quickly)
// std::this_thread::sleep_for(milliseconds(9000));
return 0;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// llil4map_buf.cc (using string_cnt_ub.h)
// A phmap::parallel_flat_hash_set demonstration.
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e216163
//
// April 25, 2024
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482
// Original challenge https://perlmonks.com/?node_id=11147822
// and summary https://perlmonks.com/?node_id=11150293
// Other demonstrations https://perlmonks.com/?node_id=11149907
//
// Authors
// Mario Roy - C++ demonstration with parallel capabilities
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com
// Gregory Popovitch - Further changes and simplification
//
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book
//
// Obtain the parallel hashmap library (required dependency):
// git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap
//
// Compile on Linux (clang++ or g++):
// clang++ -o llil4map_buf -std=c++20 -fopenmp -Wall -O3 llil4map_buf.cc -I./parallel-hashmap
//
// On macOS, use g++-12 from https://brew.sh (installation: brew install gcc@12).
// The g++ command also works with mingw C++ compiler (https://sourceforge.net/projects/mingw-w64)
// that comes bundled with Strawberry Perl (C:\Strawberry\c\bin\g++.exe).
//
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/?node_id=11148681
// perl gen-llil.pl big1.txt 200 3 1
// perl gen-llil.pl big2.txt 200 3 1
// perl gen-llil.pl big3.txt 200 3 1
//
// To make random input, obtain shuffle.pl from https://perlmonks.com/?node_id=11149800
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt
//
// Example run: llil4map big1.txt big2.txt big3.txt >out.txt
// NUM_THREADS=3 llil4map ...
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <cassert>
#include <cstdio>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <compare>
#include <chrono>
#include <string>
#include <string_view>
#include <array>
#include <vector>
#include <thread>
#include <execution>
#include <atomic>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <parallel_hashmap/phmap.h>
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, need a 64-bit compile");
// Specify 0/1 to use boost's parallel sorting algorithm; faster than __gnu_parallel::sort.
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/parallel.html
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_indirect_sort_en.pdf
// This requires the boost header files: e.g. devpkg-boost bundle on Clear Linux.
// Note: Another option is downloading and unpacking Boost locally.
// (no need to build it because the bits we use are header file only)
#define USE_BOOST_PARALLEL_SORT 1
#if USE_BOOST_PARALLEL_SORT
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wshadow"
#include <boost/sort/sort.hpp>
#pragma clang diagnostic pop
#else
#include <boost/sort/sort.hpp>
#endif
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
class spinlock_mutex {
// https://rigtorp.se/spinlock/
// https://vorbrodt.blog/2019/02/12/fast-mutex/
public:
// Assignment is disabled.
spinlock_mutex& operator=(const spinlock_mutex& rhs) = delete;
void lock() noexcept {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire))
break;
while (lock_.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
private:
alignas(4 * sizeof(std::max_align_t)) std::atomic_bool lock_ = false;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
typedef uint32_t int_type;
#include "string_cnt_ub.h" // union std::array plus buffer allocation
inline constexpr size_t MAX_THREADS = 64;
inline constexpr size_t SSO_LENGTH = 12;
using string_cnt = string_cnt_t<SSO_LENGTH, MAX_THREADS>;
namespace std {
template<> struct hash<string_cnt> {
std::size_t operator()(const string_cnt& v) const noexcept { return v.hash(); };
};
}
using string_cnt_vector_t = std::vector<string_cnt>;
// declare the type of the parallel_flat_hash_set with spinlock mutexes
using string_cnt_set_t = phmap::parallel_flat_hash_set<
string_cnt,
phmap::priv::hash_default_hash<string_cnt>,
phmap::priv::hash_default_eq<string_cnt>,
phmap::priv::Allocator<string_cnt>,
12,
spinlock_mutex
>;
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// convert positive number from string to uint32_t
inline uint32_t fast_atoll64(const char* str)
{
uint32_t val = 0;
uint8_t digit;
while ((digit = uint8_t(*str++ - '0')) <= 9)
val = val * 10 + digit;
return val;
}
// Helper function to find a character.
inline char* find_char(char* first, char* last, char c)
{
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
// Limit chunk size and line length.
inline constexpr size_t CHUNK_SIZE = 32768;
inline constexpr size_t MAX_LINE_LEN = 255;
static int64_t get_properties(
const char* fname, // in : the input file name
string_cnt_set_t& set_ret) // inout : the set to be updated
{
int64_t num_lines = 0;
std::ifstream fin(fname, std::ifstream::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname << "' : " << strerror(errno) << '\n';
return num_lines;
}
#pragma omp parallel reduction(+:num_lines)
{
std::string buf;
buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
int tid = omp_get_thread_num();
while (fin.good()) {
size_t len = 0;
// Read the next chunk serially.
#pragma omp critical
{
fin.read(&buf[0], CHUNK_SIZE);
if ((len = fin.gcount()) > 0) {
if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_LINE_LEN)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += fin.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len)
break;
buf[len] = '\0';
char *first = &buf[0];
char *last = &buf[len];
// Process max Nthreads chunks concurrently.
while (first < last) {
char* beg_ptr{first};
char* end_ptr{find_char(first, last, '\n')};
char* found = find_char(beg_ptr, end_ptr, '\t');
first = end_ptr + 1;
if (found == end_ptr)
continue;
assert(*found == '\t');
int_type count = fast_atoll64(found + 1);
string_cnt s{tid, beg_ptr, (size_t)(found - beg_ptr), count};
// Use lazy_emplace to modify the set while the mutex is locked.
set_ret.lazy_emplace_l(
s,
[&](string_cnt_set_t::value_type& p) {
p.incr(count); // called only when key was already present
s.mem_rollback(tid);
},
[&](const string_cnt_set_t::constructor& ctor) {
// construct value_type in place when key not present
// long strings will be stored in dynamic memory storage
ctor(std::move(s));
}
);
++num_lines;
}
}
}
fin.close();
// std::cerr << "getprops done\n";
return num_lines;
}
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
size_t divide_up(size_t dividend, size_t divisor)
{
if (dividend % divisor)
return (size_t)(dividend / divisor) + 1;
else
return (size_t)(dividend / divisor);
}
static void out_properties(
const int nthds, // in : the number of threads
string_cnt_vector_t& vec) // in : the vector to output
{
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE);
#ifdef _OPENMP
int nthds_out = std::min(nthds, 32);
#pragma omp parallel for ordered schedule(static, 1) num_threads(nthds_out)
#endif
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) {
std::string str(""); str.reserve(2048 * 1024);
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE;
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_SIZE);
for (; it != it2; ++it) {
str.append(it->getstr());
str.append("\t", 1);
str.append(std::to_string(it->getcnt()));
str.append("\n", 1);
}
#pragma omp ordered
std::cout << str << std::flush;
}
}
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "usage: llil4map file1 file2 ... >out.txt\n";
return 1;
}
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
std::cerr << "llil4map start\n";
#ifdef _OPENMP
std::cerr << "use OpenMP\n";
#else
std::cerr << "don't use OpenMP\n";
#endif
#if USE_BOOST_PARALLEL_SORT == 0
std::cerr << "don't use boost sort\n";
#else
std::cerr << "use boost sort\n";
#endif
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3;
cstart1 = high_resolution_clock::now();
#ifdef _OPENMP
// Determine the number of threads.
const char* env_nthds = std::getenv("NUM_THREADS");
int nthds = ( env_nthds && strlen(env_nthds) )
? ::atoi(env_nthds)
: std::thread::hardware_concurrency();
nthds = std::min((size_t)nthds, MAX_THREADS);
omp_set_dynamic(false);
omp_set_num_threads(nthds);
omp_set_max_active_levels(1);
int nthds_move = std::min(nthds, 16);
#else
int nthds = 1;
int nthds_move = 1;
#endif
// Get the list of input files from the command line
int nfiles = argc - 1;
char** fname = &argv[1];
// Store the properties into a vector
string_cnt_vector_t propvec;
int64_t num_lines = 0;
{
// Enclose the set inside a block, so GC releases the object
// immediately after exiting the scope.
string_cnt_set_t set;
for (int i = 0; i < nfiles; ++i)
num_lines += get_properties(fname[i], set);
cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "get properties " << std::setw(8) << ctaken1 << " secs\n";
if (set.size() == 0) {
std::cerr << "No work, exiting...\n";
return 1;
}
cstart2 = high_resolution_clock::now();
if (nthds == 1) {
propvec.reserve(set.size());
for (auto& x : set)
propvec.push_back(std::move(const_cast<string_cnt&>(x)));
set.clear();
}
else {
propvec.resize(set.size());
std::vector<string_cnt_vector_t::iterator> I(set.subcnt());
auto cur = propvec.begin();
for (size_t i = 0; i < set.subcnt(); ++i) {
set.with_submap(i, [&](const string_cnt_set_t::EmbeddedSet& set) {
I[i] = cur;
cur += set.size();
});
}
#pragma omp parallel for schedule(static, 1) num_threads(nthds_move)
for (size_t i = 0; i < set.subcnt(); ++i) {
set.with_submap_m(i, [&](string_cnt_set_t::EmbeddedSet& set) {
auto it = I[i];
for (auto& x : set)
*it++ = std::move(const_cast<string_cnt&>(x)); // force move
// reset the set (no longer needed) to reclaim memory early
set = string_cnt_set_t::EmbeddedSet();
});
}
}
cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "map to vector " << std::setw(8) << ctaken2 << " secs\n";
}
cstart3 = high_resolution_clock::now();
// Sort the vector by (count) in reverse order, (name) in lexical order
auto reverse_order = [](const string_cnt& left, const string_cnt& right) {
int_type left_cnt = left.getcnt(), right_cnt = right.getcnt();
return left_cnt != right_cnt ? left_cnt > right_cnt : left < right;
};
#if USE_BOOST_PARALLEL_SORT == 0
// Standard sort
std::sort(propvec.begin(), propvec.end(), reverse_order);
#else
// Parallel sort
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order,
nthds
);
#endif
cend3s = high_resolution_clock::now();
// Output the sorted vector
out_properties(nthds, propvec);
cend3 = high_resolution_clock::now();
double ctaken = elaspe_time(cend3, cstart1);
double ctaken3s = elaspe_time(cend3s, cstart3);
double ctaken3o = elaspe_time(cend3, cend3s);
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << " secs\n";
std::cerr << "write stdout " << std::setw(8) << ctaken3o << " secs\n";
std::cerr << "total time " << std::setw(8) << ctaken << " secs\n";
std::cerr << " count lines " << num_lines << "\n";
std::cerr << " count unique " << propvec.size() << "\n";
// Free dynamically allocated memory in parallel.
#pragma omp parallel for schedule(static, 200000) num_threads(nthds)
for (size_t i = 0; i < propvec.size(); i++)
propvec[i].free();
// Hack to see Private Bytes in Windows Task Manager
// (uncomment next line so process doesn't exit too quickly)
// std::this_thread::sleep_for(milliseconds(9000));
return 0;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// llil4umap_buf.cc (using string_cnt_ub.h)
// A std::unordered_set demonstration.
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e216163
//
// April 25, 2024
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482
// Original challenge https://perlmonks.com/?node_id=11147822
// and summary https://perlmonks.com/?node_id=11150293
// Other demonstrations https://perlmonks.com/?node_id=11149907
//
// Authors
// Mario Roy - C++ demonstration with parallel capabilities
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com
// Gregory Popovitch - Further changes and simplification
//
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book
//
// Compile on Linux (clang++ or g++):
// clang++ -o llil4umap_buf -std=c++20 -fopenmp -Wall -O3 llil4umap_buf.cc
//
// On macOS, use g++-12 from https://brew.sh (installation: brew install gcc@12).
// The g++ command also works with mingw C++ compiler (https://sourceforge.net/projects/mingw-w64)
// that comes bundled with Strawberry Perl (C:\Strawberry\c\bin\g++.exe).
//
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/?node_id=11148681
// perl gen-llil.pl big1.txt 200 3 1
// perl gen-llil.pl big2.txt 200 3 1
// perl gen-llil.pl big3.txt 200 3 1
//
// To make random input, obtain shuffle.pl from https://perlmonks.com/?node_id=11149800
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt
//
// Example run: llil4umap big1.txt big2.txt big3.txt >out.txt
// NUM_THREADS=3 llil4umap ...
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <cassert>
#include <cstdio>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <compare>
#include <chrono>
#include <string>
#include <string_view>
#include <array>
#include <vector>
#include <thread>
#include <execution>
#include <atomic>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <unordered_set>
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, need a 64-bit compile");
// Specify 0/1 to use boost's parallel sorting algorithm; faster than __gnu_parallel::sort.
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/parallel.html
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_indirect_sort_en.pdf
// This requires the boost header files: e.g. devpkg-boost bundle on Clear Linux.
// Note: Another option is downloading and unpacking Boost locally.
// (no need to build it because the bits we use are header file only)
#define USE_BOOST_PARALLEL_SORT 1
#if USE_BOOST_PARALLEL_SORT
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wshadow"
#include <boost/sort/sort.hpp>
#pragma clang diagnostic pop
#else
#include <boost/sort/sort.hpp>
#endif
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
class spinlock_mutex {
// https://rigtorp.se/spinlock/
// https://vorbrodt.blog/2019/02/12/fast-mutex/
public:
// Assignment is disabled.
spinlock_mutex& operator=(const spinlock_mutex& rhs) = delete;
void lock() noexcept {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire))
break;
while (lock_.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
private:
alignas(4 * sizeof(std::max_align_t)) std::atomic_bool lock_ = false;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
typedef uint32_t int_type;
#include "string_cnt_ub.h" // union std::array plus buffer allocation
inline constexpr size_t MAX_THREADS = 64;
inline constexpr size_t SSO_LENGTH = 12;
using string_cnt = string_cnt_t<SSO_LENGTH, MAX_THREADS>;
namespace std {
template<> struct hash<string_cnt> {
std::size_t operator()(const string_cnt& v) const noexcept { return v.hash(); };
};
}
using vec_str_int_type = std::vector<string_cnt>;
using map_str_int_type = std::unordered_set<string_cnt>;
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// convert positive number from string to uint32_t
inline uint32_t fast_atoll64(const char* str)
{
uint32_t val = 0;
uint8_t digit;
while ((digit = uint8_t(*str++ - '0')) <= 9)
val = val * 10 + digit;
return val;
}
// Helper function to find a character.
inline char* find_char(char* first, char* last, char c)
{
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
// Limit chunk size and line length.
inline constexpr size_t CHUNK_SIZE = 32768;
inline constexpr size_t MAX_LINE_LEN = 255;
inline constexpr size_t NUM_MAPS = 1 << 12;
static int64_t get_properties(
const char* fname, // in : the input file name
const int nthds, // in : the number of threads
auto& L, // in : the locks array
auto& M) // inout : the maps array
{
int64_t num_lines = 0;
std::ifstream fin(fname, std::ifstream::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname << "' : " << strerror(errno) << '\n';
return num_lines;
}
#pragma omp parallel reduction(+:num_lines)
{
std::string buf;
buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
int tid = omp_get_thread_num();
while (fin.good()) {
size_t len = 0;
// Read the next chunk serially.
#pragma omp critical
{
fin.read(&buf[0], CHUNK_SIZE);
if ((len = fin.gcount()) > 0) {
if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_LINE_LEN)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += fin.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len)
break;
buf[len] = '\0';
char *first = &buf[0];
char *last = &buf[len];
// Process max Nthreads chunks concurrently.
while (first < last) {
char* beg_ptr{first};
char* end_ptr{find_char(first, last, '\n')};
char* found = find_char(beg_ptr, end_ptr, '\t');
first = end_ptr + 1;
if (found == end_ptr)
continue;
assert(*found == '\t');
int_type count = fast_atoll64(found + 1);
string_cnt s{tid, beg_ptr, (size_t)(found - beg_ptr), count};
if (nthds == 1) {
auto [it, success] = M[0].emplace(std::move(s));
if (!success) {
((string_cnt&)(*it)).incr(count);
s.mem_rollback(tid);
}
}
else {
size_t hv = s.hash();
size_t idx = ( ((hv & 0x000000000000ffffULL) << 16) |
((hv & 0x00000000ffff0000ULL) >> 16) ) % NUM_MAPS;
L[idx].lock();
auto [it, success] = M[idx].emplace(std::move(s));
if (!success) {
((string_cnt&)(*it)).incr(count);
s.mem_rollback(tid);
}
L[idx].unlock();
}
++num_lines;
}
}
}
fin.close();
// std::cerr << "getprops done\n";
return num_lines;
}
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
size_t divide_up(size_t dividend, size_t divisor)
{
if (dividend % divisor)
return (size_t)(dividend / divisor) + 1;
else
return (size_t)(dividend / divisor);
}
static void out_properties(
const int nthds, // in : the number of threads
vec_str_int_type& vec) // in : the vector to output
{
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE);
#ifdef _OPENMP
int nthds_out = std::min(nthds, 32);
#pragma omp parallel for ordered schedule(static, 1) num_threads(nthds_out)
#endif
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) {
std::string str(""); str.reserve(2048 * 1024);
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE;
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_SIZE);
for (; it != it2; ++it) {
str.append(it->getstr());
str.append("\t", 1);
str.append(std::to_string(it->getcnt()));
str.append("\n", 1);
}
#pragma omp ordered
std::cout << str << std::flush;
}
}
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "usage: llil4umap file1 file2 ... >out.txt\n";
return 1;
}
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
std::cerr << "llil4umap start\n";
#ifdef _OPENMP
std::cerr << "use OpenMP\n";
#else
std::cerr << "don't use OpenMP\n";
#endif
#if USE_BOOST_PARALLEL_SORT == 0
std::cerr << "don't use boost sort\n";
#else
std::cerr << "use boost sort\n";
#endif
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3;
cstart1 = high_resolution_clock::now();
#ifdef _OPENMP
// Determine the number of threads.
const char* env_nthds = std::getenv("NUM_THREADS");
int nthds = ( env_nthds && strlen(env_nthds) )
? ::atoi(env_nthds)
: std::thread::hardware_concurrency();
nthds = std::min((size_t)nthds, MAX_THREADS);
omp_set_dynamic(false);
omp_set_num_threads(nthds);
omp_set_max_active_levels(1);
#else
int nthds = 1;
#endif
// Get the list of input files from the command line
int nfiles = argc - 1;
char** fname = &argv[1];
// Store the properties into a vector
vec_str_int_type propvec;
int64_t num_lines = 0;
int64_t num_keys = 0;
{
// Enclose shared vars L and M inside a block, for running parallel.
// So GC releases the objects immediately after exiting the scope.
spinlock_mutex L[NUM_MAPS];
map_str_int_type M[NUM_MAPS];
for (size_t i = 0; i < NUM_MAPS; ++i)
M[i].reserve(50000);
for (int i = 0; i < nfiles; ++i)
num_lines += get_properties(fname[i], nthds, L, M);
for (size_t i = 0; i < NUM_MAPS; ++i)
num_keys += M[i].size();
cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "get properties " << std::setw(8) << ctaken1 << " secs\n";
if (num_keys == 0) {
std::cerr << "No work, exiting...\n";
return 1;
}
cstart2 = high_resolution_clock::now();
if (nthds == 1) {
propvec.reserve(num_keys);
for (auto const& x : M[0])
propvec.emplace_back(std::move(const_cast<string_cnt&>(x)));
M[0].clear();
}
else {
propvec.resize(num_keys);
std::array<vec_str_int_type::iterator, NUM_MAPS> I;
I[0] = propvec.begin();
for (size_t i = 1; i < NUM_MAPS; ++i)
I[i] = I[i-1] + M[i-1].size();
#pragma omp parallel for schedule(static, 1) num_threads(nthds)
for (size_t i = 0; i < NUM_MAPS; ++i) {
auto it = I[i];
for (auto const& x : M[i])
*it++ = std::move(const_cast<string_cnt&>(x));
map_str_int_type().swap(M[i]); // swap vs. clear to reclaim mem early
}
}
cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "map to vector " << std::setw(8) << ctaken2 << " secs\n";
}
cstart3 = high_resolution_clock::now();
// Sort the vector by (count) in reverse order, (name) in lexical order
auto reverse_order = [](const string_cnt& left, const string_cnt& right) {
int_type left_cnt = left.getcnt(), right_cnt = right.getcnt();
return left_cnt != right_cnt ? left_cnt > right_cnt : left < right;
};
#if USE_BOOST_PARALLEL_SORT == 0
// Standard sort
std::sort(propvec.begin(), propvec.end(), reverse_order);
#else
// Parallel sort
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order,
#ifdef __NVCOMPILER_LLVM__
std::min(nthds, 32)
#else
nthds
#endif
);
#endif
cend3s = high_resolution_clock::now();
// Output the sorted vector
out_properties(nthds, propvec);
cend3 = high_resolution_clock::now();
double ctaken = elaspe_time(cend3, cstart1);
double ctaken3s = elaspe_time(cend3s, cstart3);
double ctaken3o = elaspe_time(cend3, cend3s);
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << " secs\n";
std::cerr << "write stdout " << std::setw(8) << ctaken3o << " secs\n";
std::cerr << "total time " << std::setw(8) << ctaken << " secs\n";
std::cerr << " count lines " << num_lines << "\n";
std::cerr << " count unique " << propvec.size() << "\n";
// Free dynamically allocated memory in parallel.
#pragma omp parallel for schedule(static, 200000) num_threads(nthds)
for (size_t i = 0; i < propvec.size(); i++)
propvec[i].free();
// Hack to see Private Bytes in Windows Task Manager
// (uncomment next line so process doesn't exit too quickly)
// std::this_thread::sleep_for(milliseconds(9000));
return 0;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// llil4vec_buf.cc (using string_cnt_ub.h)
// A std::vector demonstration using OpenMP directives.
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e216163
//
// April 25, 2024
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482
// Original challenge https://perlmonks.com/?node_id=11147822
// and summary https://perlmonks.com/?node_id=11150293
// Other demonstrations https://perlmonks.com/?node_id=11149907
//
// Authors
// Mario Roy - C++ demonstration with parallel capabilities
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com
// Gregory Popovitch - Further changes and simplification
//
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book
//
// Compile on Linux (clang++ or g++):
// clang++ -o llil4vec_buf -std=c++20 -fopenmp -Wall -O3 llil4vec_buf.cc
//
// On macOS, use g++-12 from https://brew.sh (installation: brew install gcc@12).
// The g++ command also works with mingw C++ compiler (https://sourceforge.net/projects/mingw-w64)
// that comes bundled with Strawberry Perl (C:\Strawberry\c\bin\g++.exe).
//
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/?node_id=11148681
// perl gen-llil.pl big1.txt 200 3 1
// perl gen-llil.pl big2.txt 200 3 1
// perl gen-llil.pl big3.txt 200 3 1
//
// To make random input, obtain shuffle.pl from https://perlmonks.com/?node_id=11149800
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt
//
// Example run: llil4vec big1.txt big2.txt big3.txt >out.txt
// NUM_THREADS=3 llil4vec ...
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <cassert>
#include <cstdio>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <compare>
#include <chrono>
#include <string>
#include <string_view>
#include <array>
#include <vector>
#include <thread>
#include <execution>
#include <iomanip>
#include <iostream>
#include <fstream>
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, need a 64-bit compile");
// Specify 0/1 to use boost's parallel sorting algorithm; faster than __gnu_parallel::sort.
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/parallel.html
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_indirect_sort_en.pdf
// This requires the boost header files: e.g. devpkg-boost bundle on Clear Linux.
// Note: Another option is downloading and unpacking Boost locally.
// (no need to build it because the bits we use are header file only)
#define USE_BOOST_PARALLEL_SORT 1
#if USE_BOOST_PARALLEL_SORT
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wshadow"
#include <boost/sort/sort.hpp>
#pragma clang diagnostic pop
#else
#include <boost/sort/sort.hpp>
#endif
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
typedef uint32_t int_type;
#include "string_cnt_ub.h" // union std::array plus buffer allocation
inline constexpr size_t MAX_THREADS = 64;
inline constexpr size_t SSO_LENGTH = 12;
using string_cnt = string_cnt_t<SSO_LENGTH, MAX_THREADS>;
using string_cnt_vector_t = std::vector<string_cnt>;
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// convert positive number from string to uint32_t
inline uint32_t fast_atoll64(const char* str)
{
uint32_t val = 0;
uint8_t digit;
while ((digit = uint8_t(*str++ - '0')) <= 9)
val = val * 10 + digit;
return val;
}
// Helper function to find a character.
inline char* find_char(char* first, char* last, char c)
{
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
static int64_t get_properties(
const char* fname, // in : the input file name
string_cnt_vector_t& vec_ret) // inout : the vector to be updated
{
#ifdef _OPENMP
int tid = omp_get_thread_num();
#else
int tid = 0;
#endif
int64_t num_lines = 0;
std::ifstream fin(fname, std::ios::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname << "' : " << strerror(errno) << '\n';
return num_lines;
}
fin.seekg(0, std::ios::end); // Get the size of the file
std::streampos fsize = fin.tellg();
fin.seekg(0, std::ios::beg);
std::vector<char> buf(1 + fsize); // Read the whole file into the buffer
fin.read(&buf[0], fsize);
fin.close();
char* first = &buf[0];
char* last = &buf[fsize];
// Iterate through the buffer
while (first < last) {
char* beg_ptr{first};
char* end_ptr{find_char(first, last, '\n')};
char* found = find_char(beg_ptr, end_ptr, '\t');
first = end_ptr + 1;
if (found == end_ptr)
continue;
assert(*found == '\t');
int_type count = fast_atoll64(found + 1);
size_t klen = found - beg_ptr;
string_cnt s{tid, beg_ptr, klen, count};
vec_ret.emplace_back(std::move(s));
++num_lines;
}
// std::cerr << "getprops done\n";
return num_lines;
}
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
inline constexpr size_t CHUNK_SIZE = 32768;
size_t divide_up(size_t dividend, size_t divisor)
{
if (dividend % divisor)
return (size_t)(dividend / divisor) + 1;
else
return (size_t)(dividend / divisor);
}
static void out_properties(
const int nthds, // in : the number of threads
string_cnt_vector_t& vec) // in : the vector to output
{
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE);
#ifdef _OPENMP
int nthds_out = std::min(nthds, 32);
#pragma omp parallel for ordered schedule(static, 1) num_threads(nthds_out)
#endif
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) {
std::string str(""); str.reserve(2048 * 1024);
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE;
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_SIZE);
for (; it != it2; ++it) {
str.append(it->getstr());
str.append("\t", 1);
str.append(std::to_string(it->getcnt()));
str.append("\n", 1);
}
#pragma omp ordered
std::cout << str << std::flush;
}
}
// Reduce a vector range (tally adjacent count fields of duplicate key names)
static void reduce_vec(
const int nthds, // in : the number of threads
auto& vec) // in : the vector to reduce
{
if (vec.size() == 0)
return;
// Preferred for fixed-length-only version.
auto it1 = vec.begin(); auto itr = it1; auto itw = it1;
auto it2 = vec.end();
auto curr = itr;
for (++itr; itr != it2; ++itr) {
if (*itr == *curr) {
curr->incr(itr->getcnt());
}
else {
if (itw != curr)
*itw = std::move(*curr);
++itw;
curr = itr;
}
}
if (itw != curr)
*itw = std::move(*curr);
vec.resize(std::distance(it1, ++itw));
}
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "usage: llil4vec file1 file2 ... >out.txt\n";
return 1;
}
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
std::cerr << "llil4vec start\n";
#ifdef _OPENMP
std::cerr << "use OpenMP\n";
#else
std::cerr << "don't use OpenMP\n";
#endif
#if USE_BOOST_PARALLEL_SORT == 0
std::cerr << "don't use boost sort\n";
#else
std::cerr << "use boost sort\n";
#endif
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3r, cend3s, cend3;
cstart1 = high_resolution_clock::now();
#ifdef _OPENMP
// Determine the number of threads.
const char* env_nthds = std::getenv("NUM_THREADS");
int nthds = ( env_nthds && strlen(env_nthds) )
? ::atoi(env_nthds)
: std::thread::hardware_concurrency();
nthds = std::min((size_t)nthds, MAX_THREADS);
omp_set_dynamic(false);
omp_set_num_threads(nthds);
#else
int nthds = 1;
#endif
// Get the list of input files from the command line
int nfiles = argc - 1;
char** fname = &argv[1];
// Create the vector of properties
string_cnt_vector_t propvec;
int64_t num_lines = 0;
// Run parallel, depending on the number of threads
if (nthds == 1 || nfiles == 1) {
for (int i = 0; i < nfiles; ++i)
num_lines += get_properties(fname[i], propvec);
}
#ifdef _OPENMP
else {
#pragma omp parallel for schedule(static, 1) reduction(+:num_lines)
for (int i = 0; i < nfiles; ++i) {
string_cnt_vector_t locvec;
num_lines += get_properties(fname[i], locvec);
#pragma omp critical
{
// Append local vector to propvec
auto it = locvec.begin();
auto it2 = locvec.end();
for (; it != it2; ++it)
propvec.emplace_back(std::move(*it));
}
}
}
#endif
cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "get properties " << std::setw(8) << ctaken1 << " secs\n";
if (!propvec.size()) {
std::cerr << "No work, exiting...\n";
return 1;
}
cstart2 = high_resolution_clock::now();
// Needs to be sorted by word for later sum of adjacent count fields to work
auto reverse_order1 = [](const string_cnt& left, const string_cnt& right) {
return (left < right);
};
#if USE_BOOST_PARALLEL_SORT == 0
std::sort(propvec.begin(), propvec.end(), reverse_order1);
#else
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order1,
nthds
);
#endif
cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "sort properties " << std::setw(8) << ctaken2 << " secs\n";
cstart3 = high_resolution_clock::now();
// Reduce in-place (tally adjacent count fields of duplicate key names)
reduce_vec(nthds, propvec);
cend3r = high_resolution_clock::now();
// Sort the vector by (count) in reverse order, (name) in lexical order
auto reverse_order2 = [](const string_cnt& left, const string_cnt& right) {
return left.getcnt() != right.getcnt()
? left.getcnt() > right.getcnt()
: left < right;
};
#if USE_BOOST_PARALLEL_SORT == 0
// Standard sort
std::sort(propvec.begin(), propvec.end(), reverse_order2);
#else
// Parallel sort
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order2,
std::min(nthds, 32)
);
#endif
cend3s = high_resolution_clock::now();
// Output the sorted vector
out_properties(nthds, propvec);
cend3 = high_resolution_clock::now();
double ctaken = elaspe_time(cend3, cstart1);
double ctaken3r = elaspe_time(cend3r, cstart3);
double ctaken3s = elaspe_time(cend3s, cend3r);
double ctaken3o = elaspe_time(cend3, cend3s);
std::cerr << "vector reduce " << std::setw(8) << ctaken3r << " secs\n";
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << " secs\n";
std::cerr << "write stdout " << std::setw(8) << ctaken3o << " secs\n";
std::cerr << "total time " << std::setw(8) << ctaken << " secs\n";
std::cerr << " count lines " << num_lines << "\n";
std::cerr << " count unique " << propvec.size() << "\n";
// Free dynamically allocated memory in parallel.
#pragma omp parallel for schedule(static, 200000) num_threads(nthds)
for (size_t i = 0; i < propvec.size(); i++)
propvec[i].free();
// Hack to see Private Bytes in Windows Task Manager
// (uncomment next line so process doesn't exit too quickly)
// std::this_thread::sleep_for(milliseconds(9000));
return 0;
}
#!/usr/bin/env perl
use strict;
use warnings;
use List::Util 'shuffle';
my @arr = shuffle <>;
print @arr;
// ---------------------------------------------------------------------------------------------
// Author: Gregory Popovitch, April 8, 2024
// https://github.com/greg7mdp/parallel-hashmap/issues/238#issuecomment-2039591745
//
// Author: Mario Roy, April 23, 2024
// Store the key length with the cnt variable, improves hash().
// Make free() public, allows freeing dynamically allocated memory in parallel.
// Add union std::array to improve sorting significantly for fixed length.
// Add buffer allocation, overall lesser memory consumption (llil4map_buf.cc).
//
// Stores a string + a count
// For strings up to N-1 bytes, total space used is N + 4 bytes
// For larger strings, uses N+4 bytes + strlen(string) + 1
//
// invariants
// if extra[mark_idx], str is a valid string pointer
// if !extra[mark_idx], the N bytes starting at (const char *)(&str) store a null_terminated string
// ---------------------------------------------------------------------------------------------
#pragma once
#include <cassert>
#include <cstdint>
#include <cstring>
#include <compare>
#include <string>
#include <string_view>
#include <vector>
#include <array>
// fast memory storage for long strings
template<size_t T>
struct dynamic_allocator_t {
struct memory_buf_t {
static constexpr size_t buffer_sz = 655356; // 640*1024 - 4
char data[buffer_sz];
int pos;
memory_buf_t() { pos = 0; }
int avail() { return (int)(buffer_sz) - pos; }
};
// constructor
dynamic_allocator_t() {
for (size_t i = 0; i < T; i++)
data[i].push_back(new memory_buf_t());
}
// destructor
~dynamic_allocator_t() {
for (size_t i = 0; i < T; i++) {
std::vector<memory_buf_t*> vec = data[i];
if (!vec.empty()) {
for (auto buf : vec)
delete [] buf;
vec.resize(0);
}
}
}
// append string
char *append_str(int tid, const char *s, int len) {
memory_buf_t *buf = data[tid].back();
if (len + 1 > buf->avail()) {
buf = new memory_buf_t();
data[tid].push_back(buf);
}
char *p = &buf->data[buf->pos];
std::memcpy(p, s, len);
p[len] = '\0';
buf->pos += len + 1;
return p;
}
// the key was found in the map, thus subtract length
void subtract_pos(int tid, size_t len) {
memory_buf_t *buf = data[tid].back();
buf->pos -= len + 1;
assert(buf->pos >= 0);
}
private:
std::array<std::vector<memory_buf_t*>, T> data;
};
template<size_t N, size_t T = 1>
struct string_cnt_t {
using uint_t = uint32_t;
static_assert(N >= 12);
static constexpr size_t extra_sz = N - sizeof(char*);
static constexpr size_t mark_idx = extra_sz - 1;
union {
struct {
char * str; // key name
char extra[extra_sz];
uint_t cnt; // (count << 8) + key length
} s_;
std::array<char, N> f_;
};
static constexpr size_t buffsz = sizeof(s_.str) + sizeof(s_.extra);
// constructors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// string_cnt_t<N> s{};
string_cnt_t() {
s_.str = nullptr;
s_.cnt = 0;
std::memset(&s_.extra, 0, extra_sz);
}
// string_cnt_t<N> s{std::string_view{"foo"}};
// string_cnt_t<N> s{std::string_view{"foo"}, 10};
string_cnt_t(const std::string_view& s, uint_t c = 0) {
set(0, s.data(), s.size(), c);
}
// string_cnt_t<N> s{tid, std::string_view{"bar"}};
// string_cnt_t<N> s{tid, std::string_view{"bar"}, 10};
string_cnt_t(int tid, const std::string_view& s, uint_t c = 0) {
set(tid, s.data(), s.size(), c); // thread safe
}
// string_cnt_t<N> s{"sun", 3};
// string_cnt_t<N> s{"sun", 3, 10};
string_cnt_t(const char *s, size_t len, uint_t c = 0) {
set(0, s, len, c);
}
// string_cnt_t<N> s{tid, "moon", 4};
// string_cnt_t<N> s{tid, "moon", 4, 10};
string_cnt_t(int tid, const char *s, size_t len, uint_t c = 0) {
set(tid, s, len, c); // thread safe
}
// string_cnt_t<N> s1{};
// string_cnt_t<N> s2{s1};
string_cnt_t(const string_cnt_t& o) {
std::memcpy(f_.data(), o.getdata(), N);
s_.cnt = o.s_.cnt;
}
// string_cnt_t<N> s1{};
// string_cnt_t<N> s2{std::move(s1)};
string_cnt_t(string_cnt_t&& o) noexcept {
if (o.s_.extra[mark_idx]) {
s_.str = o.s_.str;
o.s_.str = nullptr;
o.s_.extra[mark_idx] = 0;
s_.extra[mark_idx] = 1;
} else {
std::memcpy(f_.data(), o.getdata(), N);
}
s_.cnt = o.s_.cnt;
}
// destructor ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~string_cnt_t() { free(); }
// assignments ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// string_cnt_t<N> s1{}, s2{};
// s2 = s1;
string_cnt_t& operator=(const string_cnt_t& o) {
free();
std::memcpy(f_.data(), o.getdata(), N);
s_.cnt = o.s_.cnt;
return *this;
}
// string_cnt_t<N> s1{}, s2{};
// s2 = std::move(s1);
string_cnt_t& operator=(string_cnt_t&& o) noexcept {
free();
new (this) string_cnt_t(std::move(o));
return *this;
}
// comparisons ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// string_cnt_t<N> s1{}, s2{};
// (s1 <=> s2)
std::strong_ordering operator<=>(const string_cnt_t& o) const {
if (s_.extra[mark_idx] || o.s_.extra[mark_idx])
return std::strcmp(getstr(), o.getstr()) <=> 0;
else
return std::memcmp(getdata(), o.getdata(), N) <=> 0;
}
// string_cnt_t<N> s1{}, s2{};
// (s1 == s2)
bool operator==(const string_cnt_t& o) const {
if (s_.extra[mark_idx] || o.s_.extra[mark_idx])
return std::strcmp(getstr(), o.getstr()) == 0;
else
return std::memcmp(getdata(), o.getdata(), N) == 0;
}
// functions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
std::size_t hash() const {
auto s = getstr();
std::string_view sv {s, (s_.cnt & 0xff) * sizeof(char)};
return std::hash<std::string_view>()(sv);
}
const char *getdata() const { return (const char *)(f_.data()); }
const char *getstr() const { return s_.extra[mark_idx] ? s_.str : (const char *)(&s_.str); }
uint8_t getlen() const { return s_.cnt & 0xff; }
uint_t getcnt() const { return s_.cnt >> 8; }
void incr(const uint_t c) { s_.cnt += (c << 8); }
void mem_rollback(int tid = 0) {
// called by llil4map get_properties when key was already present
if (s_.extra[mark_idx])
mem.subtract_pos(tid, s_.cnt & 0xff);
}
void free() {
if (s_.extra[mark_idx]) {
s_.str = nullptr; // string in dynamic_allocator remains
s_.extra[mark_idx] = 0;
}
}
private:
static inline dynamic_allocator_t<T> mem {}; // requires minimum c++17
void set(int tid, const char *s, size_t len, uint_t c) {
if (len == 0)
std::memset(f_.data(), 0, N);
else {
s_.cnt = (c << 8) + (len & 0xff);
if (len >= buffsz) {
s_.str = mem.append_str(tid, s, len);
s_.extra[mark_idx] = 1;
} else {
std::memset(f_.data(), 0, N);
std::memcpy(f_.data(), s, std::min(len, N - 1));
}
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment