Skip to content

Instantly share code, notes, and snippets.

@marioroy
Last active June 20, 2023 22:10
Show Gist options
  • Save marioroy/643cddb782c3363d9d828b5eba292524 to your computer and use it in GitHub Desktop.
Save marioroy/643cddb782c3363d9d828b5eba292524 to your computer and use it in GitHub Desktop.
C++ parallel chunking demonstrations

C++ parallel chunking demonstrations

I came across Parallel Hashmap for C++. It provides a parallel demonstration to count the number of occurrences of each word in a large text file using multiple threads. This gist includes complimentary chunking variants for comparison. Results were captured on Clear Linux and Fedora 38.

Preparation

# Download Jane Austin "Pride and Prejudice".
curl https://www.gutenberg.org/files/1342/1342-0.txt -o 1342-0.txt.orig

# Concatenate "1342-0.txt.orig" 1000 times (~ 737 MB).
for i in $(seq 1 1000); do cat 1342-0.txt.orig >> /tmp/1342-0.txt.big; done
ln -sf /tmp/1342-0.txt.big 1342-0.txt

# Obtain the parallel hashmap library (required dependency).
git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap

# Compile the demonstrations.
g++ -o mt_word_counter -std=c++20 -Wall -O2 mt_word_counter.cc -Iparallel-hashmap
g++ -o omp_word_counter -std=c++20 -fopenmp -Wall -O2 omp_word_counter.cc -Iparallel-hashmap
g++ -o thr_word_counter -std=c++20 -Wall -O2 thr_word_counter.cc -Iparallel-hashmap

8 threads: Clear Linux (left), Fedora 38 (right)

$ NUM_THREADS=8 ./mt_word_counter | cksum
# memory consumption ~ 1360 MB
populate arrays        0.983 secs    1.042 secs
compute word counts    1.586 secs    1.924 secs
output results         0.001 secs    0.001 secs
total time             2.571 secs    2.968 secs
2419766184 3124

$ NUM_THREADS=8 ./omp_word_counter | cksum
# memory consumption < 1 MB
compute word counts    1.575 secs    1.799 secs
output results         0.001 secs    0.001 secs
total time             1.577 secs    1.801 secs
2419766184 3124

$ NUM_THREADS=8 ./thr_word_counter | cksum
# memory consumption < 1 MB
compute word counts    1.578 secs    1.779 secs
output results         0.001 secs    0.001 secs
total time             1.580 secs    1.781 secs
2419766184 3124

24 threads: Clear Linux (left), Fedora 38 (right)

$ NUM_THREADS=24 ./mt_word_counter | cksum
# memory consumption ~ 1372 MB
populate arrays        1.056 secs    1.114 secs
compute word counts    0.543 secs    0.623 secs
output results         0.001 secs    0.001 secs
total time             1.601 secs    1.740 secs
2419766184 3124

$ NUM_THREADS=24 ./omp_word_counter | cksum
# memory consumption < 1 MB
compute word counts    0.537 secs    0.606 secs
output results         0.001 secs    0.001 secs
total time             0.538 secs    0.608 secs
2419766184 3124

$ NUM_THREADS=24 ./thr_word_counter | cksum
# memory consumption < 1 MB
compute word counts    0.537 secs    0.597 secs
output results         0.001 secs    0.001 secs
total time             0.539 secs    0.599 secs
2419766184 3124
/*
* mt_word_counter.cc
* ------------------
* Parallel non-chunking demonstration
* based on https://github.com/greg7mdp/parallel-hashmap/blob/master/examples/mt_word_counter.cc
* by Mario Roy, 2023-06-20
*
* Obtain the parallel hashmap library (required dependency):
* git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap
*
* g++ -o mt_word_counter -std=c++20 -Wall -O2 mt_word_counter.cc -Iparallel-hashmap
*/
#include <chrono>
#include <iostream>
#include <iomanip>
#include <fstream>
#include <sstream>
#include <parallel_hashmap/phmap.h>
#include <parallel_hashmap/btree.h>
#include <thread>
#include <vector>
#include <algorithm>
#include <cstdlib>
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
/*
* count the number of occurrences of each word in a large text file using multiple threads
*/
int main() {
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
// download Jane Austin "Pride and Prejudice"
// ------------------------------------------
#if 0
if (system("curl https://www.gutenberg.org/files/1342/1342-0.txt -o 1342-0.txt") != 0) {
std::cerr << "Error: could not retrieve test file https://www.gutenberg.org/files/1342/1342-0.txt\n";
return 1;
}
#endif
const std::string filename = "1342-0.txt";
const char* env_nthds = std::getenv("NUM_THREADS");
int num_threads = (env_nthds && strlen(env_nthds)) ? ::atoi(env_nthds) : 4;
// parallel_flat_hash_map_m has default internal mutex
using Map = phmap::parallel_flat_hash_map_m<
std::string, int,
phmap::priv::hash_default_hash<std::string>,
phmap::priv::hash_default_eq<std::string>,
phmap::priv::Allocator<phmap::priv::Pair<const std::string, int>>,
10 // 2**10 = 1024 submaps
>;
Map word_counts;
time_point cstart0;
std::vector<std::thread> threads;
std::vector<std::vector<std::string>> lines_array;
lines_array.reserve(num_threads);
{
// populate num_threads vectors with lines from the book
std::ifstream file(filename);
if (!file.is_open()) {
std::cerr << "Error: could not open file " << filename << std::endl;
return 1;
}
cstart0 = high_resolution_clock::now();
int line_idx = 0;
std::string line;
while (std::getline(file, line)) {
lines_array[line_idx % num_threads].push_back(std::move(line));
++line_idx;
}
time_point cend0 = high_resolution_clock::now();
double ctaken0 = elaspe_time(cend0, cstart0);
std::cerr << "populate arrays " << std::setw(8) << ctaken0 << " secs\n";
}
// run 4 threads by default, each thread processing lines from one of the vectors
// ------------------------------------------------------------------------------
time_point cstart1 = high_resolution_clock::now();
threads.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back( [&word_counts, &lines_array, i] {
phmap::flat_hash_map<std::string, int> thd_counts; // local map
for (auto& line : lines_array[i]) {
std::replace_if(line.begin(), line.end(), [](char c) -> bool {
return !std::isalnum(c);
}, ' ');
std::istringstream iss(line);
std::string word;
while (iss >> word) {
const auto [it, success] = thd_counts.emplace(std::move(word), 1);
if (!success) ++it->second;
}
}
for (const auto& o : thd_counts) {
// use lazy_emplace to modify the map while the mutex is locked
word_counts.lazy_emplace_l(
o.first,
[&](Map::value_type& p) {
// called only when key was already present
p.second += o.second;
},
[&](const Map::constructor& ctor) {
// construct value_type in place when key not present
ctor(std::move(o.first), o.second);
}
);
}
});
}
for (auto& thread : threads)
thread.join();
time_point cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "compute word counts " << std::setw(8) << ctaken1 << " secs\n";
// sort by (count, word) for consistent output
// allows changing the number of hash map submaps above
// ----------------------------------------------------
time_point cstart2 = high_resolution_clock::now();
using str_int_type = std::pair<std::string, int>;
std::vector<str_int_type> vec; vec.reserve(word_counts.size());
for (const auto& p : word_counts) vec.emplace_back(p);
word_counts.clear();
std::sort(
vec.begin(), vec.end(),
[](const str_int_type& left, const str_int_type& right) {
return left.second != right.second
? left.second < right.second
: left.first < right.first;
}
);
// print one word used at each frequency
// -------------------------------------
phmap::btree_map<int, std::string> result;
for (const auto& p : vec)
result[p.second] = p.first;
for (const auto& p : result)
std::cout << p.first << ": " << p.second << std::endl;
time_point cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "output results " << std::setw(8) << ctaken2 << " secs\n";
double ctotal = elaspe_time(cend2, cstart0);
std::cerr << "total time " << std::setw(8) << ctotal << " secs\n";
return 0;
}
/*
* omp_word_counter.cc
* -------------------
* Parallel chunking demonstration using OpenMP
* based on https://github.com/greg7mdp/parallel-hashmap/blob/master/examples/mt_word_counter.cc
* by Mario Roy, 2023-06-20
*
* Obtain the parallel hashmap library (required dependency):
* git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap
*
* g++ -o omp_word_counter -std=c++20 -fopenmp -Wall -O2 omp_word_counter.cc -Iparallel-hashmap
*/
#include <chrono>
#include <iostream>
#include <iomanip>
#include <fstream>
#include <sstream>
#include <parallel_hashmap/phmap.h>
#include <parallel_hashmap/btree.h>
#include <omp.h>
#include <vector>
#include <algorithm>
#include <cstdlib>
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
/*
* helper function to find a character
*/
inline constexpr char* find_char(char* first, char* last, char c) {
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
inline constexpr int MAX_LINE_LEN = 255;
inline constexpr int CHUNK_SIZE = 16383;
/*
* count the number of occurrences of each word in a large text file using multiple threads
*/
int main() {
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
// download Jane Austin "Pride and Prejudice"
// ------------------------------------------
#if 0
if (system("curl https://www.gutenberg.org/files/1342/1342-0.txt -o 1342-0.txt") != 0) {
std::cerr << "Error: could not retrieve test file https://www.gutenberg.org/files/1342/1342-0.txt\n";
return 1;
}
#endif
const std::string filename = "1342-0.txt";
const char* env_nthds = std::getenv("NUM_THREADS");
int num_threads = (env_nthds && strlen(env_nthds)) ? ::atoi(env_nthds) : 4;
// parallel_flat_hash_map_m has default internal mutex
using Map = phmap::parallel_flat_hash_map_m<
std::string, int,
phmap::priv::hash_default_hash<std::string>,
phmap::priv::hash_default_eq<std::string>,
phmap::priv::Allocator<phmap::priv::Pair<const std::string, int>>,
10 // 2**10 = 1024 submaps
>;
Map word_counts;
std::ifstream file(filename, std::ifstream::binary);
if (!file.is_open()) {
std::cerr << "Error: could not open file " << filename << std::endl;
return 1;
}
// run 4 threads by default, each thread processing a chunk simultaneously
// -----------------------------------------------------------------------
time_point cstart1 = high_resolution_clock::now();
omp_set_dynamic(false);
omp_set_num_threads(num_threads);
#pragma omp parallel
{
phmap::flat_hash_map<std::string, int> thd_counts; // local map
std::string buf; buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
char *first, *last;
int len;
while (file.good()) {
len = 0;
// read the next chunk serially
#pragma omp critical
{
file.read(&buf[0], CHUNK_SIZE);
if ((len = file.gcount()) > 0) {
if (buf[len - 1] != '\n' && file.getline(&buf[len], MAX_LINE_LEN)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += file.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len) break;
buf[len] = '\0';
first = &buf[0];
last = &buf[len];
// process num_threads chunks concurrently
while (first < last) {
char* beg_ptr{first}; first = find_char(first, last, '\n');
char* end_ptr{first}; ++first;
std::string line(beg_ptr, end_ptr - beg_ptr + 1);
std::replace_if(line.begin(), line.end(), [](char c) -> bool {
return !std::isalnum(c);
}, ' ');
std::istringstream iss(line);
std::string word;
while (iss >> word) {
const auto [it, success] = thd_counts.emplace(std::move(word), 1);
if (!success) ++it->second;
}
}
}
for (const auto& o : thd_counts) {
// use lazy_emplace to modify the shared map while the mutex is locked
word_counts.lazy_emplace_l(
o.first,
[&](Map::value_type& p) {
// called only when key was already present
p.second += o.second;
},
[&](const Map::constructor& ctor) {
// construct value_type in place when key not present
ctor(std::move(o.first), o.second);
}
);
}
}
file.close();
time_point cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "compute word counts " << std::setw(8) << ctaken1 << " secs\n";
// sort by (count, word) for consistent output
// allows changing the number of hash map submaps above
// ----------------------------------------------------
time_point cstart2 = high_resolution_clock::now();
using str_int_type = std::pair<std::string, int>;
std::vector<str_int_type> vec; vec.reserve(word_counts.size());
for (const auto& p : word_counts) vec.emplace_back(p);
word_counts.clear();
std::sort(
vec.begin(), vec.end(),
[](const str_int_type& left, const str_int_type& right) {
return left.second != right.second
? left.second < right.second
: left.first < right.first;
}
);
// print one word used at each frequency
// -------------------------------------
phmap::btree_map<int, std::string> result;
for (const auto& p : vec)
result[p.second] = p.first;
for (const auto& p : result)
std::cout << p.first << ": " << p.second << std::endl;
time_point cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "output results " << std::setw(8) << ctaken2 << " secs\n";
double ctotal = elaspe_time(cend2, cstart1);
std::cerr << "total time " << std::setw(8) << ctotal << " secs\n";
return 0;
}
/*
* thr_word_counter.cc
* -------------------
* Parallel chunking demonstration using std::thread
* based on https://github.com/greg7mdp/parallel-hashmap/blob/master/examples/mt_word_counter.cc
* by Mario Roy, 2023-06-20
*
* Obtain the parallel hashmap library (required dependency):
* git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap
*
* g++ -o thr_word_counter -std=c++20 -Wall -O2 thr_word_counter.cc -Iparallel-hashmap
*/
#include <chrono>
#include <iostream>
#include <iomanip>
#include <fstream>
#include <sstream>
#include <parallel_hashmap/phmap.h>
#include <parallel_hashmap/btree.h>
#include <thread>
#include <mutex>
#include <vector>
#include <algorithm>
#include <cstdlib>
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
/*
* helper function to find a character
*/
inline constexpr char* find_char(char* first, char* last, char c) {
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
inline constexpr int MAX_LINE_LEN = 255;
inline constexpr int CHUNK_SIZE = 16383;
/*
* count the number of occurrences of each word in a large text file using multiple threads
*/
int main() {
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed);
// download Jane Austin "Pride and Prejudice"
// ------------------------------------------
#if 0
if (system("curl https://www.gutenberg.org/files/1342/1342-0.txt -o 1342-0.txt") != 0) {
std::cerr << "Error: could not retrieve test file https://www.gutenberg.org/files/1342/1342-0.txt\n";
return 1;
}
#endif
const std::string filename = "1342-0.txt";
const char* env_nthds = std::getenv("NUM_THREADS");
int num_threads = (env_nthds && strlen(env_nthds)) ? ::atoi(env_nthds) : 4;
// parallel_flat_hash_map_m has default internal mutex
using Map = phmap::parallel_flat_hash_map_m<
std::string, int,
phmap::priv::hash_default_hash<std::string>,
phmap::priv::hash_default_eq<std::string>,
phmap::priv::Allocator<phmap::priv::Pair<const std::string, int>>,
10 // 2**10 = 1024 submaps
>;
Map word_counts;
std::ifstream file(filename, std::ifstream::binary);
if (!file.is_open()) {
std::cerr << "Error: could not open file " << filename << std::endl;
return 1;
}
// run 4 threads by default, each thread processing a chunk simultaneously
// -----------------------------------------------------------------------
time_point cstart1 = high_resolution_clock::now();
std::vector<std::thread> thds;
thds.reserve(num_threads);
std::mutex mutex;
for (int i = 0; i < num_threads; ++i) { thds.emplace_back( [&word_counts, &file, &mutex] {
phmap::flat_hash_map<std::string, int> thd_counts; // local map
std::string buf; buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
char *first, *last;
int len;
while (file.good()) {
len = 0;
// read the next chunk serially
{
std::lock_guard<std::mutex> guard(mutex);
file.read(&buf[0], CHUNK_SIZE);
if ((len = file.gcount()) > 0) {
if (buf[len - 1] != '\n' && file.getline(&buf[len], MAX_LINE_LEN)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += file.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len) break;
buf[len] = '\0';
first = &buf[0];
last = &buf[len];
// process num_threads chunks concurrently
while (first < last) {
char* beg_ptr{first}; first = find_char(first, last, '\n');
char* end_ptr{first}; ++first;
std::string line(beg_ptr, end_ptr - beg_ptr + 1);
std::replace_if(line.begin(), line.end(), [](char c) -> bool {
return !std::isalnum(c);
}, ' ');
std::istringstream iss(line);
std::string word;
while (iss >> word) {
const auto [it, success] = thd_counts.emplace(std::move(word), 1);
if (!success) ++it->second;
}
}
}
for (const auto& o : thd_counts) {
// use lazy_emplace to modify the shared map while the mutex is locked
word_counts.lazy_emplace_l(
o.first,
[&](Map::value_type& p) {
// called only when key was already present
p.second += o.second;
},
[&](const Map::constructor& ctor) {
// construct value_type in place when key not present
ctor(std::move(o.first), o.second);
}
);
}
});}
for (auto& thread : thds)
thread.join();
file.close();
time_point cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "compute word counts " << std::setw(8) << ctaken1 << " secs\n";
// sort by (count, word) for consistent output
// allows changing the number of hash map submaps above
// ----------------------------------------------------
time_point cstart2 = high_resolution_clock::now();
using str_int_type = std::pair<std::string, int>;
std::vector<str_int_type> vec; vec.reserve(word_counts.size());
for (const auto& p : word_counts) vec.emplace_back(p);
word_counts.clear();
std::sort(
vec.begin(), vec.end(),
[](const str_int_type& left, const str_int_type& right) {
return left.second != right.second
? left.second < right.second
: left.first < right.first;
}
);
// print one word used at each frequency
// -------------------------------------
phmap::btree_map<int, std::string> result;
for (const auto& p : vec)
result[p.second] = p.first;
for (const auto& p : result)
std::cout << p.first << ": " << p.second << std::endl;
time_point cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "output results " << std::setw(8) << ctaken2 << " secs\n";
double ctotal = elaspe_time(cend2, cstart1);
std::cerr << "total time " << std::setw(8) << ctotal << " secs\n";
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment