Skip to content

Instantly share code, notes, and snippets.

@ivanstepanovftw
Created April 21, 2018 12:20
Show Gist options
  • Save ivanstepanovftw/dcc83aede3f2254925ad36d6e48058f1 to your computer and use it in GitHub Desktop.
Save ivanstepanovftw/dcc83aede3f2254925ad36d6e48058f1 to your computer and use it in GitHub Desktop.
cmake_minimum_required(VERSION 3.9)
project(threaded_handler)
set(CMAKE_CXX_STANDARD 17)
find_package(Boost)
link_directories(
${Boost_LIBRARY_DIRS}
)
include_directories(
${Boost_INCLUDE_DIRS}
)
add_executable(threaded_handler
# thread_pool_example.cc
main.cc
# thread_pool.cc
# thread_pool_another.cc
# func_arg_reordering.cc
# mapped_file_example.cc
)
target_link_libraries(threaded_handler
-lpthread
-lboost_iostreams
-lboost_chrono
-lboost_thread
-lboost_system
)
/* Отсюда: https://github.com/lemire/Code-used-on-Daniel-Lemire-s-blog/blob/master/2012/06/26/ioaccess.cpp
* Дополнено плохим примером использованием stride: https://jvns.ca/blog/2014/05/12/computers-are-fast/
*/
#define I_HAVE_NO_BOOST
//#define USE_STRIDE
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <unistd.h>
#include <iostream>
#include <fstream>
#include <vector>
#include <cassert>
#include <chrono>
#include <iomanip>
#include <mutex>
#include <deque>
#include <string>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <iostream>
#include <zconf.h>
#include <sys/stat.h>
#ifndef I_HAVE_NO_BOOST
#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/scoped_ptr.hpp>
#endif
typedef size_t TYPE;
typedef size_t ANSWER_TYPE;
const char *filename = "DELME.BIN";
constexpr size_t max_region_size = 4*1024*1024; // 4 MiB
constexpr size_t max_region_length = max_region_size / sizeof(TYPE);
constexpr size_t measures_overall = 1;
constexpr size_t measures_each = 1;
constexpr size_t regions_count = 10;
constexpr size_t stride_size = 2;
//use 94761707417984 !524287
//not 137438691328 !524287
void generate_file(const char *file_path, const size_t regions_count, const size_t region_length)
{
FILE *fp = fopen(file_path, "wb+");
assert(fp != NULL);
TYPE buffer[region_length];
for(size_t i = 0; i < region_length; i++)
buffer[i] = static_cast<TYPE>(i);
for(size_t r = 0; r < regions_count; r++) {
assert(fwrite((void *)&region_length, sizeof(size_t), 1, fp) == 1);
assert(fwrite((void *)buffer, sizeof(TYPE), region_length, fp) == region_length);
}
fflush(fp);
fclose(fp);
}
#ifndef USE_STRIDE
ANSWER_TYPE compute(TYPE *buffer, size_t region_length)
{
using namespace std;
ANSWER_TYPE answer = 0;
for(size_t i = 0; i < region_length; i++) {
answer += buffer[i];
}
return answer;
}
#else
/// using stride with cache misses 👍
ANSWER_TYPE compute(TYPE *buffer, size_t region_length)
{
using namespace std;
ANSWER_TYPE answer = 0;
for(size_t j = 0; j < stride_size; j++) {
for(size_t i = j; i < region_length; i += stride_size) {
answer += buffer[i];
}
}
return answer;
}
#endif
void log_result(const char *function_name, double done_in, ANSWER_TYPE result)
{
using namespace std;
cout<<left<<setw(28)<<setfill(' ')<<function_name
<<"done in: "<<fixed<<setprecision(8)<<done_in
<<" sec, result: "<<showbase<<result<<"."<<endl;
}
ANSWER_TYPE test_fread(const char *file_path, size_t regions_count, size_t vbuf = 0)
{
ANSWER_TYPE answer = 0;
FILE *fd = fopen(file_path, "rb");
assert(fd != NULL);
if (vbuf)
assert(setvbuf(fd, NULL, _IOFBF, vbuf) == 0); // large buffer
TYPE *buffer = new TYPE[max_region_length];
size_t region_length = 0;
for(size_t r = 0; r < regions_count; r++) {
assert(fread((void *)&region_length, sizeof(size_t), 1, fd) == 1);
assert(fread((void *)buffer, sizeof(TYPE), region_length, fd) == region_length);
answer += compute(buffer, region_length);
}
delete[] buffer;
fclose(fd);
return answer;
}
ANSWER_TYPE test_read(const char *file_path, size_t regions_count)
{
ANSWER_TYPE answer = 0;
int fd = open(file_path, O_RDONLY);
assert(fd != -1);
TYPE *buffer = new TYPE[max_region_length];
size_t region_length;
for(size_t r = 0; r < regions_count; r++) {
assert(read(fd, &region_length, sizeof(size_t)) == sizeof(size_t));
const size_t region_size = region_length * sizeof(TYPE);
assert(read(fd, buffer, region_size) == region_size);
answer += compute(buffer, region_length);
}
delete[] buffer;
close(fd);
return answer;
}
ANSWER_TYPE test_mmap(const char *file_path, size_t regions_count, bool advise, bool shared)
{
ANSWER_TYPE answer = 0;
int fd = open(file_path, O_RDONLY);
struct stat s{};
assert(fstat(fd, &s) == 0);
const size_t file_size = size_t(s.st_size);
std::cout<<"filesize: "<<file_size<<std::endl;
#ifdef __linux__
char *map = reinterpret_cast<char *>(mmap(NULL, file_size, PROT_READ,
MAP_FILE | (shared ? MAP_SHARED : MAP_PRIVATE) | MAP_POPULATE, fd,
0));
#else
char *map = reinterpret_cast<char *>(mmap(NULL, file_size, PROT_READ,
MAP_FILE | (shared ? MAP_SHARED : MAP_PRIVATE), fd, 0));
#endif
assert(map != MAP_FAILED);
if (advise)
assert(madvise(map, file_size, MADV_SEQUENTIAL | MADV_WILLNEED) == 0);
close(fd);
char *map_begin = map;
char *map_end = map + size_t(s.st_size); // [map_begin, map_end)
size_t region_length;
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
answer += compute(reinterpret_cast<TYPE *>(map), region_length);
map += region_length * sizeof(TYPE);
}
munmap(map_begin, file_size);
return answer;
}
ANSWER_TYPE test_ifstream(const char *file_path, size_t regions_count)
{
ANSWER_TYPE answer = 0;
std::ifstream in(file_path, std::ios::binary);
assert(in.is_open());
assert(in.good());
assert(!in.fail());
assert(!in.bad());
assert(!in.eof());
TYPE *buffer = new TYPE[max_region_length];
size_t region_length;
for(size_t r = 0; r < regions_count; r++) {
in.read(reinterpret_cast<char *>(&region_length), sizeof(size_t));
in.read(reinterpret_cast<char *>(buffer), region_length * sizeof(size_t));
answer += compute(buffer, region_length);
}
delete[] buffer;
in.close();
return answer;
}
#ifndef I_HAVE_NO_BOOST
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
namespace bio = boost::iostreams;
ANSWER_TYPE answer = 0;
puts("0");
bio::mapped_file mf(file_path);
assert(mf.is_open());
puts("1");
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
puts("2");
while (map < map_end) {
puts(" 3");
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
answer += compute(reinterpret_cast<TYPE *>(map), region_length);
map += region_length*sizeof(TYPE);
}
puts("4");
return answer;
}
namespace with_copy
{
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
namespace bio = boost::iostreams;
ANSWER_TYPE answer = 0;
bio::mapped_file mf(file_path);
assert(mf.is_open());
TYPE *buffer = new TYPE[max_region_length];
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
const size_t region_size = region_length*sizeof(TYPE);
memcpy(buffer, map, region_size);
answer += compute(buffer, region_length);
map += region_size;
}
delete[] buffer;
return answer;
}
}
namespace thread_group
{
ANSWER_TYPE answer;
boost::mutex getter_mutex;
boost::mutex putter_mutex;
void call(char *map, size_t region_length)
{
TYPE buffer[region_length];
{
boost::mutex::scoped_lock getter_lock(getter_mutex);
// std::clog<<"copying: map: "<<&map<<std::endl;
memcpy(buffer, map, region_length*sizeof(TYPE));
}
ANSWER_TYPE ans = compute(buffer, region_length);
{
boost::mutex::scoped_lock putter_lock(putter_mutex);
answer += ans;
}
}
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
namespace bio = boost::iostreams;
bio::mapped_file mf(file_path);
assert(mf.is_open());
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
answer = 0;
boost::asio::io_service io_service;
boost::thread_group threads;
{
boost::scoped_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
for(size_t t = 0; t < boost::thread::hardware_concurrency(); t++) {
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
{
boost::mutex::scoped_lock getter_lock(getter_mutex);
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
io_service.post(boost::bind(call, map, region_length));
map += region_length*sizeof(TYPE);
}
}
}
threads.join_all();
return answer;
}
}
namespace task_queue
{
using namespace std;
ANSWER_TYPE answer;
std::mutex getter_mutex;
std::mutex putter_mutex;
void call(TYPE *buffer, char *map, size_t region_length)
{
{
std::scoped_lock<std::mutex> lock{getter_mutex};
// std::clog<<"copying: map: "<<&map<<std::endl;
memcpy(buffer, map, region_length*sizeof(TYPE));
}
ANSWER_TYPE ans = compute(buffer, region_length);
{
std::scoped_lock<std::mutex> lock{putter_mutex};
answer += ans;
}
}
class task_queue_t
{
public:
typedef function<void(TYPE *)> task_t;
vector<thread> pool_m;
deque<task_t> deque_m;
condition_variable condition_m;
mutex mutex_m;
atomic<bool> done_m{false};
task_queue_t(size_t region_length, size_t pool_size = thread::hardware_concurrency())
{
pool_m.reserve(pool_size);
for(size_t i = 0; i < pool_size; i++)
pool_m.emplace_back(bind(&task_queue_t::worker, this, region_length, i));
}
~task_queue_t()
{
join_all();
}
void join_all()
{
unique_lock<mutex> lock{mutex_m};
lock.unlock();
if (done_m.exchange(true))
return;
condition_m.notify_all();
for(auto &thread : pool_m) {
thread.join();
}
}
template<typename F>
void push(F &&function)
{
unique_lock<mutex> lock{mutex_m};
deque_m.emplace_back(forward<F>(function));
condition_m.notify_one();
}
private:
void worker(size_t region_length, size_t i)
{
task_t task;
TYPE *buffer = new TYPE[region_length];
while (true) {
unique_lock<mutex> lock{mutex_m};
condition_m.wait(lock, [=]() { return done_m || !deque_m.empty(); });
if (deque_m.empty())
break;
task = deque_m.front();
deque_m.pop_front();
lock.unlock();
task(buffer);
}
delete[] buffer;
}
task_queue_t(const task_queue_t &) = delete;
task_queue_t(task_queue_t &&) = delete;
task_queue_t &operator=(const task_queue_t &) = delete;
task_queue_t &operator=(task_queue_t &&) = delete;
};
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
using namespace std::placeholders;
namespace bio = boost::iostreams;
bio::mapped_file mf(file_path);
assert(mf.is_open());
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
answer = 0;
task_queue_t pool(max_region_length);
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
const size_t region_size = region_length*sizeof(TYPE);
pool.push(bind(call, _1, map, region_length));
map += region_size;
}
pool.join_all();
return answer;
}
}
#endif
int main()
{
using namespace std;
using namespace std::chrono;
ANSWER_TYPE result = 0;
high_resolution_clock::time_point t1;
high_resolution_clock::time_point t2;
for(size_t m = 0; m < measures_overall; m++) {
generate_file(filename, regions_count, max_region_length);
cout<<endl;
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_fread(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("fread", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_fread(filename, regions_count, 4 * 1024); // 4 KiB
t2 = high_resolution_clock::now();
log_result("fread with 4 KiB buffer", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_fread(filename, regions_count, 32 * 1024 * 1024); // 32 MiB
t2 = high_resolution_clock::now();
log_result("fread with 32 MiB buffer", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_read(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("read", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, false, false);
t2 = high_resolution_clock::now();
log_result("mmap", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, true, false);
t2 = high_resolution_clock::now();
log_result("mmap fancy", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, false, true);
t2 = high_resolution_clock::now();
log_result("mmap shared", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, true, true);
t2 = high_resolution_clock::now();
log_result("mmap fancy shared", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_ifstream(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("ifstream", duration_cast<duration<double>>(t2 - t1).count(), result);
#ifndef I_HAVE_NO_BOOST
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += with_copy::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped with_copy", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += thread_group::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped thread_group", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += task_queue::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped task_queue", duration_cast<duration<double>>(t2 - t1).count(), result);
#endif
remove(filename);
}
}
/*
* DEBUG RESULTS
fread done in: 0.42606470 sec, result: 13743869132800.
fread with 4 KiB buffer done in: 0.49409973 sec, result: 13743869132800.
fread with 32 MiB buffer done in: 0.47028050 sec, result: 13743869132800.
read done in: 0.44382183 sec, result: 13743869132800.
mmap done in: 0.28163830 sec, result: 13743869132800.
mmap fancy done in: 0.27840858 sec, result: 13743869132800.
mmap shared done in: 0.30134640 sec, result: 13743869132800.
mmap fancy shared done in: 0.30472618 sec, result: 13743869132800.
ifstream done in: 0.39622994 sec, result: 13743869132800.
memory_mapped done in: 0.27899714 sec, result: 13743869132800.
memory_mapped with_copy done in: 0.39397306 sec, result: 13743869132800.
memory_mapped thread_group done in: 0.45736181 sec, result: 13743869132800.
memory_mapped task_queue done in: 0.33702452 sec, result: 13743869132800.
* RELEASE RESULTS */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment