#undef Yield
#include <concurrent_queue.h>
#pragma once
using namespace ::Concurrency;
namespace concurrency_extras {
/// <summary>
/// A task group that offers support for scheduling functor-based
/// tasks serially on the thread associated with a specific window handle.
///
/// The UI task group supports cancellation and waiting for outstanding work to finish.
/// </summary>
class ui_task_group;
struct _task_info
{
void *pFunc;
ui_task_group *pTaskGroup;
};
class ui_task_group
{
public:
/// <summary>
/// Schedules a lightweight task expressed as a 'Func' on this task group, typically used with
/// C++ lambda expressions.
/// </summary>
/// <param name="fn">A functor for the task to run.</param>
template <class Func>
void run(const Func& fn)
{
long tasks = taskCount++;
_Schedule_task(fn, tasks == 1);
}
/// <summary>
/// Wait for outstanding (already queued) work to finish.
/// </summary>
void wait()
{
taskCount.wait();
}
/// <summary>
/// Cancel all work that hasn't already started, and return when the
/// work already underway is complete. Does not affect future work.
/// </summary>
void cancel()
{
InterlockedIncrement(&this->canceling);
taskCount.wait();
InterlockedDecrement(&this->canceling);
}
/// <summary>
/// Constructs a new ui_task_group.
/// </summary>
ui_task_group(HWND hWnd, int msg) : canceling(0), hWnd(hWnd), msg(msg)
{
}
private:
//disable copy constructor and assignment operators
ui_task_group(const ui_task_group&);
ui_task_group const & operator=(ui_task_group const&);
HWND hWnd;
int msg;
countingevent taskCount;
volatile unsigned __int64 canceling;
concurrent_queue<_task_info *> queue;
template<class Func>
static void CALLBACK _Task_proc(void* data)
{
concurrent_queue<_task_info *> *queue = (concurrent_queue<_task_info *> *)data;
_task_info *pInfo;
long tasks = 0;
ui_task_group *pTaskGroup = NULL;
// Loop until there is no more work to perform.
do
{
while (!queue->try_pop(pInfo))
{
Context::CurrentContext()->Yield();
}
Func* pFunc = (Func*) pInfo->pFunc;
pTaskGroup = ((_task_info*) pInfo)->pTaskGroup;
if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0)
(*pFunc)();
delete pFunc;
delete pInfo;
tasks = pTaskGroup->taskCount--;
} while (tasks > 0);
}
template <class Func>
void _Schedule_task(const Func& fn, bool createWorker)
{
// Create a task info record and queue it up.
_task_info *pInfo = new _task_info;
pInfo->pFunc = new Func(fn);
pInfo->pTaskGroup = this;
queue.push(pInfo);
// Request a new worker thread if necessary.
if (createWorker)
PostMessage(hWnd, msg, (WPARAM)(LPTHREAD_START_ROUTINE)_Task_proc<Func>, (LPARAM)&queue);
}
};
}