Skip to content

Instantly share code, notes, and snippets.

@niklasgu
Created September 17, 2009 16:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niklasgu/188547 to your computer and use it in GitHub Desktop.
Save niklasgu/188547 to your computer and use it in GitHub Desktop.
#pragma once
using namespace ::Concurrency;
namespace concurrency_extras
{
/// <summary>
/// A counting event -- the event is set whenever the internal count is zero, unset when
/// the counter > 0.
/// </summary>
class countingevent
{
public:
/// <summary>
/// Increment the counter.
/// </summary>
__int64 operator ++ ()
{
critical_section::scoped_lock lock(m_lock);
if (++m_count == 1)
m_event.reset();
return m_count;
}
/// <summary>
/// Decrement the counter.
/// </summary>
__int64 operator -- ()
{
critical_section::scoped_lock lock(m_lock);
// TODO: prevent the counter to be decremented from zero.
if (--m_count == 0)
m_event.set();
return m_count;
}
/// <summary>
/// Wait for the event to be set.
/// </summary>
void wait()
{
m_event.wait();
}
/// <summary>
/// Constructor
/// </summary>
countingevent() : m_count(0) { m_event.set(); }
private:
event m_event;
critical_section m_lock;
unsigned __int64 m_count;
};
}
#include <windows.h>
#undef Yield
#include <concurrent_queue.h>
#pragma once
using namespace ::Concurrency;
namespace concurrency_extras
{
/// <summary>
/// A wrapper around the thread pool that offers support for scheduling functor-based
/// tasks serially, guaranteeing that only one worker thread is used for each task
/// group. There is no guarantee that the thread is the same from task to task, but
/// there will never be two simultaneous worker threads.
///
/// There is no support for work stealing, but unlike the thread pool, the serial task
/// group does support cancellation and waiting for outstanding work to finish.
/// </summary>
class serial_task_group;
struct _serial_task_info
{
void *pFunc;
serial_task_group *pTaskGroup;
};
class serial_task_group
{
public:
/// <summary>
/// Schedules a lightweight task expressed as a 'Func' on this scheduler, typically used with
/// C++ lambda expressions.
/// </summary>
/// <param name="fn">A functor for the task to run.</param>
template <class Func>
void run(const Func& fn)
{
__int64 tasks = taskCount++;
_Schedule_task(fn, tasks == 1);
}
/// <summary>
/// Wait for outstanding (already queued) work to finish.
/// </summary>
void wait()
{
taskCount.wait();
}
/// <summary>
/// Cancel all work that hasn't already started, and return when the
/// work already underway is complete. Does not affect future work.
/// </summary>
void cancel()
{
InterlockedIncrement(&this->canceling);
taskCount.wait();
InterlockedDecrement(&this->canceling);
}
/// <summary>
/// Constructs a new serial_task_group.
/// </summary>
serial_task_group() : scheduler(NULL), canceling(0)
{
}
serial_task_group(Scheduler *scheduler) : scheduler(scheduler), canceling(0)
{
}
private:
//disable copy constructor and assignment operators
serial_task_group(const serial_task_group&);
serial_task_group const & operator=(serial_task_group const&);
countingevent taskCount;
Scheduler *scheduler;
volatile unsigned __int64 canceling;
concurrent_queue<_serial_task_info *> queue;
template<class Func>
static void __cdecl _Task_proc(void* data)
{
concurrent_queue<_serial_task_info *> *queue = (concurrent_queue<_serial_task_info *> *)data;
_serial_task_info *pInfo;
long tasks = 0;
serial_task_group *pTaskGroup = NULL;
// Loop until there is no more work to perform.
do
{
while (!queue->try_pop(pInfo))
{
Context::CurrentContext()->Yield();
}
Func* pFunc = (Func*) pInfo->pFunc;
pTaskGroup = ((_serial_task_info*) pInfo)->pTaskGroup;
if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0)
(*pFunc)();
delete pFunc;
delete pInfo;
tasks = pTaskGroup->taskCount--;
} while (tasks > 0);
}
template <class Func>
void _Schedule_task(const Func& fn, bool createWorker)
{
// Create a task info record and queue it up.
_serial_task_info *pInfo = new _serial_task_info;
pInfo->pFunc = new Func(fn);
pInfo->pTaskGroup = this;
queue.push(pInfo);
// Request a new worker thread if necessary.
if (createWorker)
{
if (scheduler != NULL)
{
scheduler->ScheduleTask(_Task_proc<Func>, &queue);
}
else
{
CurrentScheduler::ScheduleTask(_Task_proc<Func>, &queue);
}
}
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment