Skip to content

Instantly share code, notes, and snippets.

@myaut
Last active August 29, 2015 14:16
Show Gist options
  • Save myaut/94ee59d9752f524d3da8 to your computer and use it in GitHub Desktop.
Save myaut/94ee59d9752f524d3da8 to your computer and use it in GitHub Desktop.
#include <unordered_map>
#include <mutex>
#include <list>
#include <future>
#include <iostream>
#include <chrono>
#include <algorithm>
#include <atomic>
#include "tbb/concurrent_unordered_map.h"
const int N = 1000;
const int C = 20;
const int T = 20;
using pair = std::pair<int, int>;
using list = std::list<int>;
using list_of_lists = std::list<std::list<int>>;
std::atomic<long> append_calls(0);
struct pair_hash {
std::size_t operator()(const pair &x) const
{ return std::hash<int>()(x.first) ^ std::hash<int>()(x.second); }
};
class unsafe_map_of_list_of_lists :
public std::unordered_map<pair, list_of_lists, pair_hash> {
public:
void append(const pair&& tpp, list tp) {
++append_calls;
(*this)[tpp].push_back(std::move(tp));
}
};
class mutex_map_of_list_of_lists :
public std::unordered_map<pair, list_of_lists, pair_hash> {
std::mutex mapmutex;
public:
void append(const pair&& tpp, list tp) {
++append_calls;
std::lock_guard<std::mutex> lock(mapmutex);
(*this)[tpp].push_back(std::move(tp));
}
};
class concurrent_map_of_list_of_lists :
public tbb::concurrent_unordered_map<pair, list_of_lists, pair_hash> {
public:
void append(const pair&& tpp, list tp) {
++append_calls;
(*this)[tpp].push_back(std::move(tp));
}
};
template<typename M>
void unithread_process(M& map, const list& l) {
for(auto it1 = std::begin(l); it1 != std::end(l); ++it1) {
list history;
for(auto it2 = std::next(it1); it2 != std::end(l); ++it2) {
map.append(pair(*it1, *it2), history);
history.emplace_back(*it1 * *it2);
}
}
}
template<typename M>
void async_process(M& map, const list& l) {
std::vector<std::future<void>> promises;
promises.reserve(l.size());
for(auto it1 = std::begin(l); it1 != std::end(l); ++it1) {
auto process_helper = [&map](decltype(std::begin(l)) it1,
decltype(std::end(l)) end) {
list history;
for(auto it2 = std::next(it1); it2 != end; ++it2) {
map.append(pair(*it1, *it2), history);
history.emplace_back(*it1 * *it2);
}
};
promises.push_back(std::async(std::launch::async,
process_helper, it1, std::end(l)));
}
for(auto& future : promises)
future.wait();
}
template<typename M>
void async_process_chunked(M& map, const list& l) {
std::vector<std::future<void>> promises;
promises.reserve(l.size());
for(auto start = std::begin(l), last = std::begin(l);
last != std::end(l); start = last) {
auto process_helper = [&map](decltype(std::begin(l)) start,
decltype(std::begin(l)) last,
decltype(std::end(l)) end) {
for(; start != last; ++start) {
list history;
for(auto it2 = std::next(start); it2 != end; ++it2) {
map.append(pair(*start, *it2), history);
history.emplace_back(*start * *it2);
}
}
};
last = start;
std::advance(last, C);
promises.push_back(std::async(std::launch::async,
process_helper, start, last, std::end(l)));
}
for(auto& future : promises)
future.wait();
}
template<typename M>
void threaded_process(M& map, const list& l) {
std::vector<std::thread> threads;
threads.reserve(T);
auto it = std::begin(l);
for(int tid = 0; tid < T && it != std::end(l); ++tid, ++it) {
auto process_helper = [&map](decltype(std::begin(l)) it1,
decltype(std::end(l)) end, int T) {
while(it1 != end) {
list history;
for(auto it2 = std::next(it1); it2 != end; ++it2) {
map.append(pair(*it1, *it2), history);
history.emplace_back(*it1 * *it2);
}
// std::advance may go after end -- very bad
for(int i = 0; i < T && it1 != end; ++i)
++it1;
}
};
threads.emplace_back(process_helper, it, std::end(l), T);
}
for(auto& thread : threads)
thread.join();
}
int main(int argc, char* argv[]) {
if(argc < 2) {
std::cerr << "usage: maptest {uni|mutex|tbb} [chunked|threaded]" << std::endl;
return 1;
}
std::string test_name = argv[1];
std::string test_flavour = (argc == 3)? argv[2] : "";
/* Generate list of random numbers */
std::random_device randdev;
list l;
std::default_random_engine randgen(randdev());
std::generate_n(std::back_inserter(l), N,
[&randgen]() { return randgen() % (N / 4); });
/* Run one of various tests */
using std::chrono::system_clock;
using std::chrono::duration_cast;
using std::chrono::milliseconds;
auto start = system_clock::now();
bool calls_reporter_working = true;
std::thread calls_reporter([&calls_reporter_working]() {
milliseconds period{1000};
while(calls_reporter_working) {
std::this_thread::sleep_for(period);
std::cerr << append_calls << " append() calls" << std::endl;
std::cerr.flush();
}
});
if(test_name == "uni") {
unsafe_map_of_list_of_lists p;
unithread_process(p, l);
}
else if(test_name == "mutex") {
mutex_map_of_list_of_lists p;
if(test_flavour == "chunked")
async_process_chunked(p, l);
else if(test_flavour == "threaded")
threaded_process(p, l);
else
async_process(p, l);
}
else if(test_name == "tbb") {
concurrent_map_of_list_of_lists p;
if(test_flavour == "chunked")
async_process_chunked(p, l);
else if(test_flavour == "threaded")
threaded_process(p, l);
else
async_process(p, l);
}
auto time = duration_cast<milliseconds>(system_clock::now() - start);
std::cerr << time.count() << "ms" << std::endl;
std::cerr << append_calls << " append() calls" << std::endl;
calls_reporter_working = false;
calls_reporter.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment