|
/* |
|
* thr_word_counter.cc |
|
* ------------------- |
|
* Parallel chunking demonstration using std::thread |
|
* based on https://github.com/greg7mdp/parallel-hashmap/blob/master/examples/mt_word_counter.cc |
|
* by Mario Roy, 2023-06-20 |
|
* |
|
* Obtain the parallel hashmap library (required dependency): |
|
* git clone --depth=1 https://github.com/greg7mdp/parallel-hashmap |
|
* |
|
* g++ -o thr_word_counter -std=c++20 -Wall -O2 thr_word_counter.cc -Iparallel-hashmap |
|
*/ |
|
|
|
#include <chrono> |
|
#include <iostream> |
|
#include <iomanip> |
|
#include <fstream> |
|
#include <sstream> |
|
#include <parallel_hashmap/phmap.h> |
|
#include <parallel_hashmap/btree.h> |
|
#include <thread> |
|
#include <mutex> |
|
#include <vector> |
|
#include <algorithm> |
|
#include <cstdlib> |
|
|
|
typedef std::chrono::high_resolution_clock high_resolution_clock; |
|
typedef std::chrono::high_resolution_clock::time_point time_point; |
|
typedef std::chrono::milliseconds milliseconds; |
|
|
|
double elaspe_time(time_point cend, time_point cstart) { |
|
return double ( |
|
std::chrono::duration_cast<milliseconds>(cend - cstart).count() |
|
) * 1e-3; |
|
} |
|
|
|
/* |
|
* helper function to find a character |
|
*/ |
|
|
|
inline constexpr char* find_char(char* first, char* last, char c) { |
|
while (first != last) { |
|
if (*first == c) break; |
|
++first; |
|
} |
|
return first; |
|
} |
|
|
|
inline constexpr int MAX_LINE_LEN = 255; |
|
inline constexpr int CHUNK_SIZE = 16383; |
|
|
|
/* |
|
* count the number of occurrences of each word in a large text file using multiple threads |
|
*/ |
|
|
|
int main() { |
|
|
|
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fixed); |
|
|
|
// download Jane Austin "Pride and Prejudice" |
|
// ------------------------------------------ |
|
#if 0 |
|
if (system("curl https://www.gutenberg.org/files/1342/1342-0.txt -o 1342-0.txt") != 0) { |
|
std::cerr << "Error: could not retrieve test file https://www.gutenberg.org/files/1342/1342-0.txt\n"; |
|
return 1; |
|
} |
|
#endif |
|
|
|
const std::string filename = "1342-0.txt"; |
|
const char* env_nthds = std::getenv("NUM_THREADS"); |
|
int num_threads = (env_nthds && strlen(env_nthds)) ? ::atoi(env_nthds) : 4; |
|
|
|
// parallel_flat_hash_map_m has default internal mutex |
|
using Map = phmap::parallel_flat_hash_map_m< |
|
std::string, int, |
|
phmap::priv::hash_default_hash<std::string>, |
|
phmap::priv::hash_default_eq<std::string>, |
|
phmap::priv::Allocator<phmap::priv::Pair<const std::string, int>>, |
|
10 // 2**10 = 1024 submaps |
|
>; |
|
Map word_counts; |
|
|
|
std::ifstream file(filename, std::ifstream::binary); |
|
if (!file.is_open()) { |
|
std::cerr << "Error: could not open file " << filename << std::endl; |
|
return 1; |
|
} |
|
|
|
// run 4 threads by default, each thread processing a chunk simultaneously |
|
// ----------------------------------------------------------------------- |
|
time_point cstart1 = high_resolution_clock::now(); |
|
std::vector<std::thread> thds; |
|
thds.reserve(num_threads); |
|
std::mutex mutex; |
|
|
|
for (int i = 0; i < num_threads; ++i) { thds.emplace_back( [&word_counts, &file, &mutex] { |
|
|
|
phmap::flat_hash_map<std::string, int> thd_counts; // local map |
|
std::string buf; buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0'); |
|
char *first, *last; |
|
int len; |
|
|
|
while (file.good()) { |
|
len = 0; |
|
|
|
// read the next chunk serially |
|
{ |
|
std::lock_guard<std::mutex> guard(mutex); |
|
file.read(&buf[0], CHUNK_SIZE); |
|
if ((len = file.gcount()) > 0) { |
|
if (buf[len - 1] != '\n' && file.getline(&buf[len], MAX_LINE_LEN)) { |
|
// Getline discards the newline char and appends null char. |
|
// Therefore, change '\0' to '\n'. |
|
len += file.gcount(); |
|
buf[len - 1] = '\n'; |
|
} |
|
} |
|
} |
|
|
|
if (!len) break; |
|
buf[len] = '\0'; |
|
first = &buf[0]; |
|
last = &buf[len]; |
|
|
|
// process num_threads chunks concurrently |
|
while (first < last) { |
|
char* beg_ptr{first}; first = find_char(first, last, '\n'); |
|
char* end_ptr{first}; ++first; |
|
|
|
std::string line(beg_ptr, end_ptr - beg_ptr + 1); |
|
std::replace_if(line.begin(), line.end(), [](char c) -> bool { |
|
return !std::isalnum(c); |
|
}, ' '); |
|
|
|
std::istringstream iss(line); |
|
std::string word; |
|
|
|
while (iss >> word) { |
|
const auto [it, success] = thd_counts.emplace(std::move(word), 1); |
|
if (!success) ++it->second; |
|
} |
|
} |
|
} |
|
|
|
for (const auto& o : thd_counts) { |
|
// use lazy_emplace to modify the shared map while the mutex is locked |
|
word_counts.lazy_emplace_l( |
|
o.first, |
|
[&](Map::value_type& p) { |
|
// called only when key was already present |
|
p.second += o.second; |
|
}, |
|
[&](const Map::constructor& ctor) { |
|
// construct value_type in place when key not present |
|
ctor(std::move(o.first), o.second); |
|
} |
|
); |
|
} |
|
});} |
|
|
|
for (auto& thread : thds) |
|
thread.join(); |
|
|
|
file.close(); |
|
|
|
time_point cend1 = high_resolution_clock::now(); |
|
double ctaken1 = elaspe_time(cend1, cstart1); |
|
std::cerr << "compute word counts " << std::setw(8) << ctaken1 << " secs\n"; |
|
|
|
// sort by (count, word) for consistent output |
|
// allows changing the number of hash map submaps above |
|
// ---------------------------------------------------- |
|
time_point cstart2 = high_resolution_clock::now(); |
|
using str_int_type = std::pair<std::string, int>; |
|
|
|
std::vector<str_int_type> vec; vec.reserve(word_counts.size()); |
|
for (const auto& p : word_counts) vec.emplace_back(p); |
|
word_counts.clear(); |
|
|
|
std::sort( |
|
vec.begin(), vec.end(), |
|
[](const str_int_type& left, const str_int_type& right) { |
|
return left.second != right.second |
|
? left.second < right.second |
|
: left.first < right.first; |
|
} |
|
); |
|
|
|
// print one word used at each frequency |
|
// ------------------------------------- |
|
phmap::btree_map<int, std::string> result; |
|
for (const auto& p : vec) |
|
result[p.second] = p.first; |
|
|
|
for (const auto& p : result) |
|
std::cout << p.first << ": " << p.second << std::endl; |
|
|
|
time_point cend2 = high_resolution_clock::now(); |
|
double ctaken2 = elaspe_time(cend2, cstart2); |
|
std::cerr << "output results " << std::setw(8) << ctaken2 << " secs\n"; |
|
|
|
double ctotal = elaspe_time(cend2, cstart1); |
|
std::cerr << "total time " << std::setw(8) << ctotal << " secs\n"; |
|
|
|
return 0; |
|
} |