Skip to content

Instantly share code, notes, and snippets.

@alexanderkjeldaas
Last active December 17, 2019 08:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexanderkjeldaas/58b0cd894cf9f8397292657908046634 to your computer and use it in GitHub Desktop.
Save alexanderkjeldaas/58b0cd894cf9f8397292657908046634 to your computer and use it in GitHub Desktop.
// g++ -O3 -g -std=c++17 test.cc -lboost_filesystem -lboost_system -lpthread -lboost_iostreams -lboost_thread
#include <iostream>
#include <string>
#include <bitset>
#include <thread>
// #include <experimental/barrier>
#include <boost/thread/barrier.hpp>
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/filesystem.hpp>
#define BOOST_DISABLE_ASSERTS 1
#include <boost/assert.hpp>
using namespace std;
struct entry {
char chars[3];
char numbers[3];
char _cr;
char _nl;
};
// Parallel designed to iterate over multiple datasets.
static
void parallel_for(vector<uintmax_t> nb_elements,
function<void (const vector<pair<uintmax_t,uintmax_t>> range, int thread)> functor,
unsigned nb_threads)
{
vector<uintmax_t> batch_size;
vector<uintmax_t> batch_remainder;
for (const auto &e : nb_elements) {
batch_size.push_back((e + nb_threads - 1) / nb_threads);
batch_remainder.push_back(e % nb_threads);
}
vector< thread > my_threads(nb_threads);
// Parallel execution
for(unsigned i = 0; i < nb_threads; ++i)
{
vector<pair<uintmax_t,uintmax_t>> range;
for (unsigned j = 0; j < batch_size.size(); j++) {
auto bs = batch_size[j];
range.push_back(
make_pair(i * bs,
min( (i + 1) * bs, nb_elements[j])
)
);
}
my_threads[i] = thread(functor, range, i);
}
// Wait for the other thread to finish their task
for (auto &t : my_threads) {
t.join();
}
}
const int max_threads = 8;
typedef bitset<26*26*26> (*thread_local_bitsets)[10][10];
int main (int argc, const char *argv[])
{
BOOST_ASSERT(argc == 2);
const char* path = argv[1];
size_t filesize = boost::filesystem::file_size(path);
boost::iostreams::mapped_file_params params;
params.path = path;
params.length = filesize;
params.offset = 0;
boost::iostreams::mapped_file_source mf;
mf.open(params);
struct entry *entries = static_cast<struct entry*>((void*)mf.data());
BOOST_ASSERT(entries != NULL);
uintmax_t nb_elements = filesize / sizeof(struct entry);
BOOST_ASSERT(nb_elements > 0);
unsigned nb_threads = thread::hardware_concurrency();
if (nb_threads == 0 || nb_threads > 8) {
nb_threads = 8;
}
cout << "Running " << nb_threads << " threads\n";
boost::barrier task_barrier(nb_threads);
thread_local_bitsets partitions[nb_threads];
parallel_for({nb_elements, 1000}, [&entries, &partitions, &task_barrier, &nb_threads](const vector<pair<uintmax_t, uintmax_t>> range, int tidx) mutable {
// Parallel init
{
partitions[tidx] = new bitset<26*26*26>[10][10][10];
}
task_barrier.count_down_and_wait();
// Parallel parse and set thread-local bitsets.
{
auto [start, end] = range[0];
for(int i = start; i < end; ++i) {
auto [c0, c1, c2] = entries[i].chars;
BOOST_ASSERT(c0 >= 'A' && c0 <= 'Z');
BOOST_ASSERT(c1 >= 'A' && c1 <= 'Z');
BOOST_ASSERT(c2 >= 'A' && c2 <= 'Z');
auto val = (c0 - 'A') * (26*26) + (c1 - 'A') * 26 + c2 - 'A';
BOOST_ASSERT(val < 26*26*26);
auto [n0, n1, n2] = entries[i].numbers;
BOOST_ASSERT(n0 >= '0' && n0 <= '9');
BOOST_ASSERT(n1 >= '0' && n1 <= '9');
BOOST_ASSERT(n2 >= '0' && n2 <= '9');
bitset<26*26*26> &bits = partitions[tidx][n0-'0'][n1-'0'][n2-'0'];
if (bits[val]) {
cout << "Found duplicate!\n" << flush;
_exit(1);
}
bits[val] = true;
}
}
task_barrier.count_down_and_wait();
// Parallel join bitsets
{
auto [start, end] = range[1];
for (int a = 0; a < 10; a++) {
for (int b = 0; b < 10; b++) {
for (int c = 0; c < 10; c++) {
// Stupid partitioning. Could short-circuit the loops above instead.
int stupid = a*100 + b*10 + c;
if (stupid < start || stupid >= end)
continue;
// We're tasked to check if partitions[*][a][b][c] contains a duplicate.
bitset<26*26*26> bits;
bits.reset();
for (int part = 0; part < nb_threads; part++) {
bits |= partitions[part][a][b][c];
}
for (int part = 0; part < nb_threads; part++) {
bits ^= partitions[part][a][b][c];
}
if (bits.any()) {
cout << "Found duplicate!\n" << flush;
_exit(1);
}
}
}
}
}
}, nb_threads);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment