-
-
Save ACUVE/02df595cd4ec9f78d143 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <atomic> | |
#include <chrono> | |
#include <iostream> | |
#include <limits> | |
#include <condition_variable> | |
#include <thread> | |
#include <vector> | |
#include <shared_mutex> | |
#include <omp.h> | |
template< typename FUNC, typename... Args > | |
inline std::chrono::nanoseconds time( FUNC &&func, Args &&... args ) | |
{ | |
auto const start = std::chrono::high_resolution_clock::now(); | |
func( std::forward< Args >( args )... ); | |
auto const end = std::chrono::high_resolution_clock::now(); | |
auto const duration = end - start; | |
return std::chrono::duration_cast< std::chrono::nanoseconds >( duration ); | |
} | |
class ThreadPool | |
{ | |
public: | |
using func_type = void (*)( void *data, unsigned int num, unsigned int thread_num ); | |
private: | |
std::atomic< func_type > func; | |
std::atomic< void * > func_data; | |
std::mutex mutex; | |
std::atomic< unsigned int > waiting_counter; | |
std::condition_variable cond; | |
unsigned int counter; | |
unsigned int const num_thread; | |
std::vector< std::thread > thread; | |
private: | |
void WaitAllWaiting() | |
{ | |
unsigned int const len = num_thread - 1; | |
while( waiting_counter.load() != len ) | |
std::this_thread::yield(); | |
} | |
public: | |
ThreadPool( unsigned int const hc = std::thread::hardware_concurrency() ) | |
: func( nullptr ) | |
, func_data( nullptr ) | |
, waiting_counter( 0 ) | |
, counter( 0 ) | |
, num_thread( [ & ]{ if( hc ) return hc; return 1u; }() ) | |
{ | |
unsigned int const len = num_thread - 1; | |
for( unsigned int i = 0; i < len; ++i ) | |
thread.emplace_back( &ThreadPool::Thread, this, i ); | |
WaitAllWaiting(); | |
} | |
~ThreadPool() | |
{ | |
func.store( nullptr ); | |
{ | |
std::lock_guard< decltype( mutex ) > lock( mutex ); | |
counter += 1; | |
} | |
cond.notify_all(); | |
for( auto &&th : thread ) | |
th.join(); | |
} | |
void Thread( unsigned int const num ) | |
{ | |
unsigned int c = counter; | |
unsigned int const num_t = num_thread; | |
while( true ) | |
{ | |
waiting_counter.fetch_add( 1 ); | |
{ | |
std::unique_lock< decltype( mutex ) > lock( mutex ); | |
cond.wait( lock, [ & ] { return c != counter; } ); | |
} | |
c = counter; | |
auto f = func.load(); | |
if( !f ) | |
break; | |
f( func_data.load(), num + 1, num_t ); | |
} | |
} | |
void Do( func_type f, void *data ) | |
{ | |
if( f == nullptr ) | |
return; | |
func.store( f, std::memory_order_relaxed ); | |
func_data.store( data, std::memory_order_relaxed ); | |
waiting_counter.store( 0, std::memory_order_release ); | |
{ | |
std::lock_guard< decltype( mutex ) > lock( mutex ); | |
counter += 1; | |
} | |
cond.notify_all(); | |
f( data, 0, num_thread ); | |
WaitAllWaiting(); | |
} | |
}; | |
template< typename Func, typename UInt, typename... Args > | |
struct ParallelClass | |
{ | |
Func &&f; | |
UInt c; | |
std::tuple< Args &&... > v; | |
template< std::size_t... I > | |
static | |
void ThreadImpl( ParallelClass *v, unsigned int const num, unsigned int const thread_num, std::index_sequence< I... > ) | |
{ | |
decltype( v->c ) const | |
part = v->c / thread_num, | |
rest = v->c % thread_num, | |
s = part * num + (num < rest ? num : rest), | |
e = s + part + (num < rest); | |
v->f( s, e, std::forward< Args >( std::get< I >( v->v ) )... ); | |
} | |
static | |
void Thread( void *data, unsigned int const num, unsigned int const thread_num ) | |
{ | |
ThreadImpl( reinterpret_cast< ParallelClass * >( data ), num, thread_num, std::index_sequence_for< Args... >{} ); | |
} | |
}; | |
template< typename Func, typename UInt, typename... Args > | |
void Parallel( ThreadPool &pool, UInt c, Func && func, Args &&... args ) | |
{ | |
using PC = ParallelClass< Func, UInt, Args... >; | |
PC d = { std::forward< Func >( func ), c, std::tuple< Args &&... >{ std::forward< Args >( args )... } }; | |
pool.Do( &PC::Thread, &d ); | |
} | |
void thread( ThreadPool &pool, std::atomic< double > *const ret, unsigned int const c ) | |
{ | |
Parallel( pool, c, | |
[ & ]( unsigned int const s, unsigned int const e ) | |
{ | |
double sum = 0; | |
double const delta = 1.0 / c; | |
for( auto i = s; i < e; ++i ) | |
{ | |
double const x = (delta * i); | |
double const area = delta * 1.0 / (x * x + 1.0); | |
sum += area; | |
} | |
double expected = ret->load(); | |
while( !std::atomic_compare_exchange_weak( ret, &expected, expected + sum ) ); | |
} | |
); | |
} | |
void openmp( std::atomic< double > *const v, unsigned int const c ) | |
{ | |
double sum = 0; | |
double const delta = 1.0 / c; | |
#pragma omp parallel for reduction ( + : sum ) | |
for( unsigned int i = 0; i < c; ++i ) | |
{ | |
double const x = (delta * i); | |
double const area = delta * 1.0 / (x * x + 1.0); | |
sum += area; | |
} | |
v->store( sum ); | |
} | |
int main( int argc, char **argv ) | |
{ | |
constexpr unsigned int NUM_TEST = 100000; | |
unsigned int const count = 10 * argc; | |
unsigned long long int mintime_t = std::numeric_limits< decltype( mintime_t ) >::max(); | |
unsigned long long int mintime_o = std::numeric_limits< decltype( mintime_o ) >::max(); | |
unsigned long long int sumtime_t = 0, sumtime_o = 0; | |
double minv_t = std::numeric_limits< decltype( minv_t ) >::max(); | |
double minv_o = std::numeric_limits< decltype( minv_o ) >::max(); | |
std::atomic< double > v; | |
{ | |
ThreadPool pool; | |
for( unsigned int i = 0; i < NUM_TEST; ++i ) | |
{ | |
v = 0; | |
unsigned long long int const t = time( thread, pool, &v, count ).count(); | |
mintime_t = std::min( mintime_t, t ); | |
minv_t = std::min( minv_t, v.load() ); | |
sumtime_t += t; | |
} | |
} | |
{ | |
for( unsigned int i = 0; i < NUM_TEST; ++i ) | |
{ | |
v = 0; | |
unsigned long long int const t = time( openmp, &v, count ).count(); | |
mintime_o = std::min( mintime_o, t ); | |
minv_o = std::min( minv_o, v.load() ); | |
sumtime_o += t; | |
} | |
} | |
{ | |
std::cout << "NUM_TSET: " << NUM_TEST << ", count: " << count << std::endl; | |
std::cout << "Thread min time[ns]: " << mintime_t << ", OpenMP min time[ns]: " << mintime_o << std::endl; | |
std::cout << "Thread mean time[ns]: " << static_cast< double >( sumtime_t ) / NUM_TEST << ", OpenMP mean time[ns]: " << static_cast< double >( sumtime_o ) / NUM_TEST << std::endl; | |
std::cout << "Thread_v: " << minv_t << ", OpenMP_v: " << minv_o << std::endl; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment