niklasgu (owner)

Revisions

gist: 228178 Download_button fork
public
Public Clone URL: git://gist.github.com/228178.git
Embed All Files: show embed
uitaskgroup.h #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#undef Yield
#include <concurrent_queue.h>
#pragma once
 
using namespace ::Concurrency;
 
namespace concurrency_extras {
    /// <summary>
    /// A task group that offers support for scheduling functor-based
    /// tasks serially on the thread associated with a specific window handle.
    ///
    /// The UI task group supports cancellation and waiting for outstanding work to finish.
    /// </summary>
    class ui_task_group;
    struct _task_info
    {
        void *pFunc;
        ui_task_group *pTaskGroup;
    };
    class ui_task_group
    {
    public:
        /// <summary>
        /// Schedules a lightweight task expressed as a 'Func' on this task group, typically used with
        /// C++ lambda expressions.
        /// </summary>
        /// <param name="fn">A functor for the task to run.</param>
        template <class Func>
        void run(const Func& fn)
        {
            long tasks = taskCount++;
            _Schedule_task(fn, tasks == 1);
        }
        /// <summary>
        /// Wait for outstanding (already queued) work to finish.
        /// </summary>
        void wait()
        {
            taskCount.wait();
        }
        /// <summary>
        /// Cancel all work that hasn't already started, and return when the
        /// work already underway is complete. Does not affect future work.
        /// </summary>
        void cancel()
        {
            InterlockedIncrement(&this->canceling);
            taskCount.wait();
            InterlockedDecrement(&this->canceling);
        }
        /// <summary>
        /// Constructs a new ui_task_group.
        /// </summary>
        ui_task_group(HWND hWnd, int msg) : canceling(0), hWnd(hWnd), msg(msg)
        {
        }
 
    private:
        //disable copy constructor and assignment operators
        ui_task_group(const ui_task_group&);
        ui_task_group const & operator=(ui_task_group const&);
        
        HWND hWnd;
int msg;
 
        countingevent taskCount;
        volatile unsigned __int64 canceling;
        concurrent_queue<_task_info *> queue;
 
        template<class Func>
        static void CALLBACK _Task_proc(void* data)
        {
            concurrent_queue<_task_info *> *queue = (concurrent_queue<_task_info *> *)data;
            _task_info *pInfo;
 
            long tasks = 0;
            ui_task_group *pTaskGroup = NULL;
 
            // Loop until there is no more work to perform.
            do
            {
                while (!queue->try_pop(pInfo))
                {
                    Context::CurrentContext()->Yield();
                }
                
                Func* pFunc = (Func*) pInfo->pFunc;
                pTaskGroup = ((_task_info*) pInfo)->pTaskGroup;
                
                if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0)
                    (*pFunc)();
                
                delete pFunc;
                delete pInfo;
            
                tasks = pTaskGroup->taskCount--;
 
            } while (tasks > 0);
        }
 
        template <class Func>
        void _Schedule_task(const Func& fn, bool createWorker)
        {
            // Create a task info record and queue it up.
            _task_info *pInfo = new _task_info;
            pInfo->pFunc = new Func(fn);
            pInfo->pTaskGroup = this;
 
            queue.push(pInfo);
 
            // Request a new worker thread if necessary.
            if (createWorker)
                PostMessage(hWnd, msg, (WPARAM)(LPTHREAD_START_ROUTINE)_Task_proc<Func>, (LPARAM)&queue);
        }
    };
}