Skip to content

Instantly share code, notes, and snippets.

@llhe
Created June 21, 2017 08:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save llhe/f4b97c34e5756e9cd778de4857ab21f1 to your computer and use it in GitHub Desktop.
Save llhe/f4b97c34e5756e9cd778de4857ab21f1 to your computer and use it in GitHub Desktop.
memcpy benchmark
/*
* g++ -std=c++11 -pthread memtest.cc
*/
#include <cstring>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class thread_pool {
public:
thread_pool(size_t threads);
~thread_pool();
template<class Task, class... Args>
auto submit(Task&& task, Args&&... args)
-> std::future<typename std::result_of<Task(Args...)>::type>;
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> task_queue_;
std::mutex mutex_;
std::condition_variable cond_;
bool should_stop_;
};
inline thread_pool::thread_pool(size_t threads) : should_stop_(false) {
for(size_t i = 0; i < threads; ++i) {
workers_.emplace_back(
[this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->cond_.wait(lock, [this] {
return this->should_stop_ || !this->task_queue_.empty(); });
if (this->should_stop_ && this->task_queue_.empty()) {
// exit only when there is no remaining tasks
break;
}
task = std::move(this->task_queue_.front());
this->task_queue_.pop();
}
task();
}
}
);
}
}
inline thread_pool::~thread_pool() {
{
std::unique_lock<std::mutex> lock(mutex_);
should_stop_ = true;
}
cond_.notify_all();
for(std::thread &worker : workers_) {
worker.join();
}
}
template<class Task, class... Args>
auto thread_pool::submit(Task&& task, Args&&... args)
-> std::future<typename std::result_of<Task(Args...)>::type>
{
using return_type = typename std::result_of<Task(Args...)>::type;
auto packed_task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Task>(task), std::forward<Args>(args)...));
std::future<return_type> result = packed_task->get_future();
{
std::unique_lock<std::mutex> lock(mutex_);
if(should_stop_) {
throw std::runtime_error("Thread pool stopped");
}
task_queue_.emplace([packed_task](){ (*packed_task)(); });
}
cond_.notify_one();
return result;
}
void benchmark_st() {
std::vector<size_t> sizes = {
1 << 8, // 256
1 << 10, // 1K
1 << 12, // 4K
1 << 20, // 1M
1 << 22, // 4M
1 << 27, // 128M
1 << 28, // 256M
1 << 29, // 512M
1 << 30, // 1G
};
std::vector<std::unique_ptr<char>> srcs, dsts;
for (size_t size : sizes) {
auto src = std::unique_ptr<char>(new char[size]);
auto dst = std::unique_ptr<char>(new char[size]);
srcs.push_back(std::move(src));
dsts.push_back(std::move(dst));
}
constexpr long size_1g = 1L << 30;
constexpr long total_copy_ng = 1 << 6; // 64
for (size_t i = 0; i < sizes.size(); ++i) {
char *src = srcs[i].get();
char *dst = dsts[i].get();
size_t size = sizes[i];
// warm ups
for (int j = 0; j < 10; ++j) {
memcpy(dst, src, size);
}
// tests
long max_iters = size_1g / size * total_copy_ng;
auto t0 = std::chrono::steady_clock::now();
for (long j = 0; j < max_iters; ++j) {
memcpy(dst, src, size);
}
auto t1 = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0);
double speed = static_cast<double>(total_copy_ng) * 1000 / duration.count();
std::cout << "size=" << size << "\t: " << speed << " GB/sec\n";
}
}
void benchmark_mt() {
std::vector<int> thread_nums = {1, 2, 4, 8, 16};
size_t size = 1 << 27; // 128M
auto src = std::unique_ptr<char>(new char[size]);
auto dst = std::unique_ptr<char>(new char[size]);
constexpr long size_1g = 1L << 30;
constexpr long total_copy_ng = 1 << 6; // 64 GB
// warm ups
for (int i = 0; i < 10; ++i) {
memcpy(dst.get(), src.get(), size);
}
for (int thread_num : thread_nums) {
thread_pool pool(thread_num);
long max_iters = size_1g / size * total_copy_ng;
size_t block_size = size / thread_num;
auto t0 = std::chrono::steady_clock::now();
std::vector<std::future<int>> results;
for (long i = 0; i < max_iters; ++i) {
for (int j = 0; j < thread_num; ++j) {
char *src_buf = src.get();
char *dst_buf = dst.get();
results.emplace_back(
pool.submit([src_buf, dst_buf, j, block_size] {
memcpy(dst_buf + j * block_size, src_buf + j * block_size, block_size);
return 0;
})
);
}
for(auto && result: results) {
result.get();
}
results.clear();
}
auto t1 = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0);
double speed = static_cast<double>(total_copy_ng) * 1000 / duration.count();
std::cout << "thread=" << thread_num << "\t: " << speed << " GB/sec\n";
}
}
int main(int argc, char **argv) {
std::cout << "---------------- single thread memcpy ---------------\n";
benchmark_st();
std::cout << "---------------- multiple thread memcpy ---------------\n";
benchmark_mt();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment