Created
September 17, 2009 16:04
-
-
Save niklasgu/188547 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
using namespace ::Concurrency; | |
namespace concurrency_extras | |
{ | |
/// <summary> | |
/// A counting event -- the event is set whenever the internal count is zero, unset when | |
/// the counter > 0. | |
/// </summary> | |
class countingevent | |
{ | |
public: | |
/// <summary> | |
/// Increment the counter. | |
/// </summary> | |
__int64 operator ++ () | |
{ | |
critical_section::scoped_lock lock(m_lock); | |
if (++m_count == 1) | |
m_event.reset(); | |
return m_count; | |
} | |
/// <summary> | |
/// Decrement the counter. | |
/// </summary> | |
__int64 operator -- () | |
{ | |
critical_section::scoped_lock lock(m_lock); | |
// TODO: prevent the counter to be decremented from zero. | |
if (--m_count == 0) | |
m_event.set(); | |
return m_count; | |
} | |
/// <summary> | |
/// Wait for the event to be set. | |
/// </summary> | |
void wait() | |
{ | |
m_event.wait(); | |
} | |
/// <summary> | |
/// Constructor | |
/// </summary> | |
countingevent() : m_count(0) { m_event.set(); } | |
private: | |
event m_event; | |
critical_section m_lock; | |
unsigned __int64 m_count; | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <windows.h> | |
#undef Yield | |
#include <concurrent_queue.h> | |
#pragma once | |
using namespace ::Concurrency; | |
namespace concurrency_extras | |
{ | |
/// <summary> | |
/// A wrapper around the thread pool that offers support for scheduling functor-based | |
/// tasks serially, guaranteeing that only one worker thread is used for each task | |
/// group. There is no guarantee that the thread is the same from task to task, but | |
/// there will never be two simultaneous worker threads. | |
/// | |
/// There is no support for work stealing, but unlike the thread pool, the serial task | |
/// group does support cancellation and waiting for outstanding work to finish. | |
/// </summary> | |
class serial_task_group; | |
struct _serial_task_info | |
{ | |
void *pFunc; | |
serial_task_group *pTaskGroup; | |
}; | |
class serial_task_group | |
{ | |
public: | |
/// <summary> | |
/// Schedules a lightweight task expressed as a 'Func' on this scheduler, typically used with | |
/// C++ lambda expressions. | |
/// </summary> | |
/// <param name="fn">A functor for the task to run.</param> | |
template <class Func> | |
void run(const Func& fn) | |
{ | |
__int64 tasks = taskCount++; | |
_Schedule_task(fn, tasks == 1); | |
} | |
/// <summary> | |
/// Wait for outstanding (already queued) work to finish. | |
/// </summary> | |
void wait() | |
{ | |
taskCount.wait(); | |
} | |
/// <summary> | |
/// Cancel all work that hasn't already started, and return when the | |
/// work already underway is complete. Does not affect future work. | |
/// </summary> | |
void cancel() | |
{ | |
InterlockedIncrement(&this->canceling); | |
taskCount.wait(); | |
InterlockedDecrement(&this->canceling); | |
} | |
/// <summary> | |
/// Constructs a new serial_task_group. | |
/// </summary> | |
serial_task_group() : scheduler(NULL), canceling(0) | |
{ | |
} | |
serial_task_group(Scheduler *scheduler) : scheduler(scheduler), canceling(0) | |
{ | |
} | |
private: | |
//disable copy constructor and assignment operators | |
serial_task_group(const serial_task_group&); | |
serial_task_group const & operator=(serial_task_group const&); | |
countingevent taskCount; | |
Scheduler *scheduler; | |
volatile unsigned __int64 canceling; | |
concurrent_queue<_serial_task_info *> queue; | |
template<class Func> | |
static void __cdecl _Task_proc(void* data) | |
{ | |
concurrent_queue<_serial_task_info *> *queue = (concurrent_queue<_serial_task_info *> *)data; | |
_serial_task_info *pInfo; | |
long tasks = 0; | |
serial_task_group *pTaskGroup = NULL; | |
// Loop until there is no more work to perform. | |
do | |
{ | |
while (!queue->try_pop(pInfo)) | |
{ | |
Context::CurrentContext()->Yield(); | |
} | |
Func* pFunc = (Func*) pInfo->pFunc; | |
pTaskGroup = ((_serial_task_info*) pInfo)->pTaskGroup; | |
if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0) | |
(*pFunc)(); | |
delete pFunc; | |
delete pInfo; | |
tasks = pTaskGroup->taskCount--; | |
} while (tasks > 0); | |
} | |
template <class Func> | |
void _Schedule_task(const Func& fn, bool createWorker) | |
{ | |
// Create a task info record and queue it up. | |
_serial_task_info *pInfo = new _serial_task_info; | |
pInfo->pFunc = new Func(fn); | |
pInfo->pTaskGroup = this; | |
queue.push(pInfo); | |
// Request a new worker thread if necessary. | |
if (createWorker) | |
{ | |
if (scheduler != NULL) | |
{ | |
scheduler->ScheduleTask(_Task_proc<Func>, &queue); | |
} | |
else | |
{ | |
CurrentScheduler::ScheduleTask(_Task_proc<Func>, &queue); | |
} | |
} | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment