-
-
Save abby-sergz/12a2c1d25fc93c17838c878a11d7fe89 to your computer and use it in GitHub Desktop.
SSCCE: SyncCollection, ActiveObject and AsyncExecutor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cassert> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <deque> | |
#include <list> | |
namespace util { | |
// For the sake of simplicity here is only basic functionality. | |
template<typename Container> | |
class SynchronizedCollection { | |
public: | |
typedef typename Container::value_type value_type; | |
void push_back(const value_type& value) { | |
{ | |
std::lock_guard<std::mutex> lock(m_mutex); | |
m_collection.push_back(value); | |
} | |
m_conditionVar.notify_one(); | |
} | |
void push_back(value_type&& value) { | |
{ | |
std::lock_guard<std::mutex> lock(m_mutex); | |
m_collection.push_back(std::move(value)); | |
} | |
m_conditionVar.notify_one(); | |
} | |
value_type pop_front() { | |
std::unique_lock<std::mutex> lock(m_mutex); | |
m_conditionVar.wait(lock, [this]()->bool{ | |
return !m_collection.empty(); | |
}); | |
value_type retValue = m_collection.front(); | |
m_collection.pop_front(); | |
return retValue; | |
} | |
protected: | |
Container m_collection; | |
std::mutex m_mutex; | |
std::condition_variable m_conditionVar; | |
}; | |
// Sequentailly executes all calls in a single worker thread. | |
// Attention! It waits for finishing of all already posted calls. | |
class Active final { | |
public: | |
typedef std::function<void()> Call; | |
Active() | |
: m_isRunning(true) | |
{ | |
m_thread = std::thread([this] { | |
threadFunc(); | |
}); | |
} | |
~Active() { | |
post([this]{ | |
m_isRunning = false; | |
}); | |
m_thread.join(); | |
} | |
void post(const Call& call) { | |
if (!call) | |
return; | |
m_calls.push_back(call); | |
} | |
void post(Call&& call) { | |
if (!call) | |
return; | |
m_calls.push_back(std::move(call)); | |
} | |
private: | |
void threadFunc() { | |
while (m_isRunning) { | |
Call call = m_calls.pop_front(); | |
try { | |
call(); | |
} catch (...) { | |
// do nothing, but the thread will be alive. | |
} | |
} | |
} | |
private: | |
bool m_isRunning; | |
SynchronizedCollection<std::list<Call>> m_calls; | |
std::thread m_thread; | |
}; | |
// Spawns a new thread for each task and waits for finishing of all spawned threads in destructor. | |
class AsyncExecutor final { | |
// This class not only serializes access to the list of threads but also | |
// ensures that internals of std::thread are valid in another (collecting) | |
// thread. The latter is about sentries protecting A and B because | |
// otherwise it can happen that a worker thread has already finished the | |
// call, passed info to the collector thread and the collector is trying | |
// to get information from iterator (B) but the assignment (A) has not | |
// happened yet. | |
class SyncThreads final | |
{ | |
typedef std::list<std::thread> Threads; | |
public: | |
typedef Threads::iterator iterator; | |
typedef Threads::const_iterator const_iterator; | |
void spawn_thread(std::function<void(iterator)>&& task) { | |
std::lock_guard<std::mutex> lock(m_mutex); | |
auto ii_thread = m_collection.emplace(m_collection.end()); | |
*ii_thread = /* A */ std::thread(std::move(task), ii_thread); | |
// no need to notify here with the current usage of that class. | |
} | |
std::thread take_out(iterator pos) { | |
std::thread retValue; | |
{ | |
std::lock_guard<std::mutex> lock(m_mutex); | |
retValue = /* B */ std::move(*pos); | |
m_collection.erase(pos); | |
} | |
m_conditionVar.notify_one(); | |
return retValue; | |
} | |
void wait_util_empty() { | |
std::unique_lock<std::mutex> lock(m_mutex); | |
m_conditionVar.wait(lock, [this]()->bool { | |
return m_collection.empty(); | |
}); | |
} | |
protected: | |
Threads m_collection; | |
std::mutex m_mutex; | |
std::condition_variable m_conditionVar; | |
}; | |
public: | |
~AsyncExecutor() { | |
m_threads.wait_util_empty(); | |
} | |
void dispatch(const std::function<void()>& call) { | |
if (!call) | |
return; | |
m_threads.spawn_thread([this, call](SyncThreads::iterator ii_thread) { | |
call(); | |
m_threadCollector.post([this, ii_thread] { | |
m_threads.take_out(ii_thread).join(); | |
}); | |
}); | |
} | |
private: | |
SyncThreads m_threads; | |
Active m_threadCollector; | |
}; | |
} | |
#include <iostream> | |
#include <chrono> | |
#include <string> | |
void test() { | |
//* | |
util::Active syncOutput; | |
/*/ | |
// no op | |
struct { | |
void post(const util::Active::Call&){}; | |
} syncOutput; | |
//*/ | |
auto threadFunc = [&syncOutput](int t, const std::string msg) { | |
syncOutput.post([t, msg]{ | |
std::cout << "begin " << t << " - " << msg << std::endl; | |
}); | |
std::this_thread::sleep_for(std::chrono::seconds(t)); | |
syncOutput.post([t, msg]{ | |
std::cout << "end " << t << " - " << msg << std::endl; | |
}); | |
}; | |
int i = 1; | |
while (i-- > 0) { | |
util::AsyncExecutor executor; | |
executor.dispatch(std::bind(threadFunc, 1, "")); | |
executor.dispatch(std::bind(threadFunc, 4, "1")); | |
executor.dispatch(std::bind(threadFunc, 5, "")); | |
executor.dispatch(std::bind(threadFunc, 9, "")); | |
executor.dispatch(std::bind(threadFunc, 3, "1")); | |
executor.dispatch(std::bind(threadFunc, 4, "2")); | |
executor.dispatch(std::bind(threadFunc, 8, "")); | |
executor.dispatch(std::bind(threadFunc, 3, "2")); | |
executor.dispatch(std::bind(threadFunc, 3, "3")); | |
executor.dispatch(std::bind(threadFunc, 3, "4")); | |
executor.dispatch(std::bind(threadFunc, 7, "")); | |
executor.dispatch(std::bind(threadFunc, 2, "")); | |
} | |
} | |
int main() { | |
test(); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
begin 1 - | |
begin 4 - 1 | |
begin 5 - | |
begin 9 - | |
begin 3 - 1 | |
begin 4 - 2 | |
begin 8 - | |
begin 3 - 2 | |
begin 3 - 3 | |
begin 3 - 4 | |
begin 7 - | |
begin 2 - | |
end 1 - | |
end 2 - | |
end 3 - 2 | |
end 3 - 1 | |
end 3 - 3 | |
end 3 - 4 | |
end 4 - 1 | |
end 4 - 2 | |
end 5 - | |
end 7 - | |
end 8 - | |
end 9 - |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment