Skip to content

Instantly share code, notes, and snippets.

@abby-sergz
Last active April 3, 2017 08:37
Show Gist options
  • Save abby-sergz/12a2c1d25fc93c17838c878a11d7fe89 to your computer and use it in GitHub Desktop.
Save abby-sergz/12a2c1d25fc93c17838c878a11d7fe89 to your computer and use it in GitHub Desktop.
SSCCE: SyncCollection, ActiveObject and AsyncExecutor
#include <cassert>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <list>
namespace util {
// For the sake of simplicity here is only basic functionality.
template<typename Container>
class SynchronizedCollection {
public:
typedef typename Container::value_type value_type;
void push_back(const value_type& value) {
{
std::lock_guard<std::mutex> lock(m_mutex);
m_collection.push_back(value);
}
m_conditionVar.notify_one();
}
void push_back(value_type&& value) {
{
std::lock_guard<std::mutex> lock(m_mutex);
m_collection.push_back(std::move(value));
}
m_conditionVar.notify_one();
}
value_type pop_front() {
std::unique_lock<std::mutex> lock(m_mutex);
m_conditionVar.wait(lock, [this]()->bool{
return !m_collection.empty();
});
value_type retValue = m_collection.front();
m_collection.pop_front();
return retValue;
}
protected:
Container m_collection;
std::mutex m_mutex;
std::condition_variable m_conditionVar;
};
// Sequentailly executes all calls in a single worker thread.
// Attention! It waits for finishing of all already posted calls.
class Active final {
public:
typedef std::function<void()> Call;
Active()
: m_isRunning(true)
{
m_thread = std::thread([this] {
threadFunc();
});
}
~Active() {
post([this]{
m_isRunning = false;
});
m_thread.join();
}
void post(const Call& call) {
if (!call)
return;
m_calls.push_back(call);
}
void post(Call&& call) {
if (!call)
return;
m_calls.push_back(std::move(call));
}
private:
void threadFunc() {
while (m_isRunning) {
Call call = m_calls.pop_front();
try {
call();
} catch (...) {
// do nothing, but the thread will be alive.
}
}
}
private:
bool m_isRunning;
SynchronizedCollection<std::list<Call>> m_calls;
std::thread m_thread;
};
// Spawns a new thread for each task and waits for finishing of all spawned threads in destructor.
class AsyncExecutor final {
// This class not only serializes access to the list of threads but also
// ensures that internals of std::thread are valid in another (collecting)
// thread. The latter is about sentries protecting A and B because
// otherwise it can happen that a worker thread has already finished the
// call, passed info to the collector thread and the collector is trying
// to get information from iterator (B) but the assignment (A) has not
// happened yet.
class SyncThreads final
{
typedef std::list<std::thread> Threads;
public:
typedef Threads::iterator iterator;
typedef Threads::const_iterator const_iterator;
void spawn_thread(std::function<void(iterator)>&& task) {
std::lock_guard<std::mutex> lock(m_mutex);
auto ii_thread = m_collection.emplace(m_collection.end());
*ii_thread = /* A */ std::thread(std::move(task), ii_thread);
// no need to notify here with the current usage of that class.
}
std::thread take_out(iterator pos) {
std::thread retValue;
{
std::lock_guard<std::mutex> lock(m_mutex);
retValue = /* B */ std::move(*pos);
m_collection.erase(pos);
}
m_conditionVar.notify_one();
return retValue;
}
void wait_util_empty() {
std::unique_lock<std::mutex> lock(m_mutex);
m_conditionVar.wait(lock, [this]()->bool {
return m_collection.empty();
});
}
protected:
Threads m_collection;
std::mutex m_mutex;
std::condition_variable m_conditionVar;
};
public:
~AsyncExecutor() {
m_threads.wait_util_empty();
}
void dispatch(const std::function<void()>& call) {
if (!call)
return;
m_threads.spawn_thread([this, call](SyncThreads::iterator ii_thread) {
call();
m_threadCollector.post([this, ii_thread] {
m_threads.take_out(ii_thread).join();
});
});
}
private:
SyncThreads m_threads;
Active m_threadCollector;
};
}
#include <iostream>
#include <chrono>
#include <string>
void test() {
//*
util::Active syncOutput;
/*/
// no op
struct {
void post(const util::Active::Call&){};
} syncOutput;
//*/
auto threadFunc = [&syncOutput](int t, const std::string msg) {
syncOutput.post([t, msg]{
std::cout << "begin " << t << " - " << msg << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(t));
syncOutput.post([t, msg]{
std::cout << "end " << t << " - " << msg << std::endl;
});
};
int i = 1;
while (i-- > 0) {
util::AsyncExecutor executor;
executor.dispatch(std::bind(threadFunc, 1, ""));
executor.dispatch(std::bind(threadFunc, 4, "1"));
executor.dispatch(std::bind(threadFunc, 5, ""));
executor.dispatch(std::bind(threadFunc, 9, ""));
executor.dispatch(std::bind(threadFunc, 3, "1"));
executor.dispatch(std::bind(threadFunc, 4, "2"));
executor.dispatch(std::bind(threadFunc, 8, ""));
executor.dispatch(std::bind(threadFunc, 3, "2"));
executor.dispatch(std::bind(threadFunc, 3, "3"));
executor.dispatch(std::bind(threadFunc, 3, "4"));
executor.dispatch(std::bind(threadFunc, 7, ""));
executor.dispatch(std::bind(threadFunc, 2, ""));
}
}
int main() {
test();
return 0;
}
begin 1 -
begin 4 - 1
begin 5 -
begin 9 -
begin 3 - 1
begin 4 - 2
begin 8 -
begin 3 - 2
begin 3 - 3
begin 3 - 4
begin 7 -
begin 2 -
end 1 -
end 2 -
end 3 - 2
end 3 - 1
end 3 - 3
end 3 - 4
end 4 - 1
end 4 - 2
end 5 -
end 7 -
end 8 -
end 9 -
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment