niklasgu (owner)

Revisions

gist: 188547 Download_button fork
public
Public Clone URL: git://gist.github.com/188547.git
Embed All Files: show embed
countingevent.h #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#pragma once
 
using namespace ::Concurrency;
 
namespace concurrency_extras
{
    /// <summary>
    /// A counting event -- the event is set whenever the internal count is zero, unset when
    /// the counter > 0.
    /// </summary>
    class countingevent
    {
    public:
        /// <summary>
        /// Increment the counter.
        /// </summary>
        __int64 operator ++ ()
        {
            critical_section::scoped_lock lock(m_lock);
            if (++m_count == 1)
                m_event.reset();
            return m_count;
        }
 
        /// <summary>
        /// Decrement the counter.
        /// </summary>
        __int64 operator -- ()
        {
            critical_section::scoped_lock lock(m_lock);
            // TODO: prevent the counter to be decremented from zero.
            if (--m_count == 0)
                m_event.set();
            return m_count;
        }
 
        /// <summary>
        /// Wait for the event to be set.
        /// </summary>
        void wait()
        {
            m_event.wait();
        }
 
        /// <summary>
        /// Constructor
        /// </summary>
        countingevent() : m_count(0) { m_event.set(); }
 
    private:
 
        event m_event;
        critical_section m_lock;
        unsigned __int64 m_count;
    };
}
 
task_scheduler.h #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <windows.h>
#undef Yield
#include <concurrent_queue.h>
#pragma once
 
using namespace ::Concurrency;
 
namespace concurrency_extras
{
    /// <summary>
    /// A wrapper around the thread pool that offers support for scheduling functor-based
    /// tasks serially, guaranteeing that only one worker thread is used for each task
    /// group. There is no guarantee that the thread is the same from task to task, but
    /// there will never be two simultaneous worker threads.
    ///
    /// There is no support for work stealing, but unlike the thread pool, the serial task
    /// group does support cancellation and waiting for outstanding work to finish.
    /// </summary>
 
    class serial_task_group;
 
    struct _serial_task_info
    {
        void *pFunc;
        serial_task_group *pTaskGroup;
    };
 
    class serial_task_group
    {
    public:
        /// <summary>
        /// Schedules a lightweight task expressed as a 'Func' on this scheduler, typically used with
        /// C++ lambda expressions.
        /// </summary>
        /// <param name="fn">A functor for the task to run.</param>
        template <class Func>
        void run(const Func& fn)
        {
            __int64 tasks = taskCount++;
            _Schedule_task(fn, tasks == 1);
        }
        /// <summary>
        /// Wait for outstanding (already queued) work to finish.
        /// </summary>
        void wait()
        {
            taskCount.wait();
        }
        /// <summary>
        /// Cancel all work that hasn't already started, and return when the
        /// work already underway is complete. Does not affect future work.
        /// </summary>
        void cancel()
        {
            InterlockedIncrement(&this->canceling);
            taskCount.wait();
            InterlockedDecrement(&this->canceling);
        }
        /// <summary>
        /// Constructs a new serial_task_group.
        /// </summary>
        serial_task_group() : scheduler(NULL), canceling(0)
        {
        }
 
        serial_task_group(Scheduler *scheduler) : scheduler(scheduler), canceling(0)
        {
        }
 
    private:
        //disable copy constructor and assignment operators
        serial_task_group(const serial_task_group&);
        serial_task_group const & operator=(serial_task_group const&);
        
        countingevent taskCount;
 
        Scheduler *scheduler;
 
        volatile unsigned __int64 canceling;
        concurrent_queue<_serial_task_info *> queue;
 
        template<class Func>
        static void __cdecl _Task_proc(void* data)
        {
            concurrent_queue<_serial_task_info *> *queue = (concurrent_queue<_serial_task_info *> *)data;
            _serial_task_info *pInfo;
            
            long tasks = 0;
            serial_task_group *pTaskGroup = NULL;
 
            // Loop until there is no more work to perform.
            do
            {
                while (!queue->try_pop(pInfo))
                {
                    Context::CurrentContext()->Yield();
                }
                
                Func* pFunc = (Func*) pInfo->pFunc;
                pTaskGroup = ((_serial_task_info*) pInfo)->pTaskGroup;
            
                if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0)
                    (*pFunc)();
 
                delete pFunc;
                delete pInfo;
                
                tasks = pTaskGroup->taskCount--;
            
            } while (tasks > 0);
        }
 
        template <class Func>
        void _Schedule_task(const Func& fn, bool createWorker)
        {
            // Create a task info record and queue it up.
            _serial_task_info *pInfo = new _serial_task_info;
            pInfo->pFunc = new Func(fn);
            pInfo->pTaskGroup = this;
            queue.push(pInfo);
 
            // Request a new worker thread if necessary.
            if (createWorker)
            {
                if (scheduler != NULL)
                {
                    scheduler->ScheduleTask(_Task_proc<Func>, &queue);
                }
                else
                {
                    CurrentScheduler::ScheduleTask(_Task_proc<Func>, &queue);
                }
            }
        }
    };
}