Skip to content

Instantly share code, notes, and snippets.

@x42
Last active July 6, 2019 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save x42/9aa5e737a1479bafb7f1bb96f7c64dc0 to your computer and use it in GitHub Desktop.
Save x42/9aa5e737a1479bafb7f1bb96f7c64dc0 to your computer and use it in GitHub Desktop.
// g++ `pkg-config --cflags --libs glib-2.0` -pthread semproc++.cc -o semproc
/* Example to parallel progress a directed acyclic graph as described in
* Chapter 17.3 of http://www.theses.fr/2017PA080116, page 131-135
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf
*
* (C) 2017, 2019 Robin Gareus <robin@gareus.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
/* Number of worker threads to start (in addition to main thread). Can be 0.
* This should be user configurable and set to "available CPU cores - 1"
*/
#define N_WORKERS (3)
#include <cassert>
#include <cstdio>
#include <glib.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include <list>
#include <set>
#include <string>
#include <vector>
/** Lock free multiple producer, multiple consumer queue
*
* inspired by http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
* Kudos to Dmitry Vyukov
*/
template <typename T>
class MPMCQueue
{
public:
MPMCQueue (size_t buffer_size = 8)
: _buffer (0)
, _buffer_mask (0)
{
reserve (buffer_size);
}
~MPMCQueue ()
{
delete[] _buffer;
}
static size_t
power_of_two_size (size_t sz)
{
int32_t power_of_two;
for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two)
;
return 1U << power_of_two;
}
void
reserve (size_t buffer_size)
{
buffer_size = power_of_two_size (buffer_size);
assert ((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0));
if (_buffer_mask >= buffer_size - 1) {
return;
}
delete[] _buffer;
_buffer = new cell_t[buffer_size];
_buffer_mask = buffer_size - 1;
clear ();
}
void
clear ()
{
for (size_t i = 0; i <= _buffer_mask; ++i) {
g_atomic_int_set (&_buffer[i]._sequence, i);
}
g_atomic_int_set (&_enqueue_pos, 0);
g_atomic_int_set (&_dequeue_pos, 0);
}
bool
push_back (T const& data)
{
cell_t* cell;
guint pos = g_atomic_int_get (&_enqueue_pos);
for (;;) {
cell = &_buffer[pos & _buffer_mask];
guint seq = g_atomic_int_get (&cell->_sequence);
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
if (dif == 0) {
if (g_atomic_int_compare_and_exchange (&_enqueue_pos, pos, pos + 1)) {
break;
}
} else if (dif < 0) {
assert (0);
return false;
} else {
pos = g_atomic_int_get (&_enqueue_pos);
}
}
cell->_data = data;
g_atomic_int_set (&cell->_sequence, pos + 1);
return true;
}
bool
dequeue (T& data)
{
cell_t* cell;
guint pos = g_atomic_int_get (&_dequeue_pos);
for (;;) {
cell = &_buffer[pos & _buffer_mask];
guint seq = g_atomic_int_get (&cell->_sequence);
intptr_t dif = (intptr_t)seq - (intptr_t) (pos + 1);
if (dif == 0) {
if (g_atomic_int_compare_and_exchange (&_dequeue_pos, pos, pos + 1)) {
break;
}
} else if (dif < 0) {
return false;
} else {
pos = g_atomic_int_get (&_dequeue_pos);
}
}
data = cell->_data;
g_atomic_int_set (&cell->_sequence, pos + _buffer_mask + 1);
return true;
}
private:
struct cell_t {
volatile guint _sequence;
T _data;
};
cell_t* _buffer;
size_t _buffer_mask;
volatile guint _enqueue_pos;
volatile guint _dequeue_pos;
};
class Graph;
/** A Process Graph Node to process */
class GraphNode {
public:
GraphNode (Graph* g, std::string const& n)
: _graph (g)
, _name (n)
, _refcount (0)
, _init_refcount (0)
{
/* fake some work */
_work_delay = 1 + rand () % 10;
}
void run ()
{
process ();
finish ();
}
const char* cname () const { return _name.c_str (); }
/* setup methods, to add graph edges.
* Must be called during initialization only.
*
* Note that this example does not include toplogical sorting.
* It is left to the graph-setup to order the DAG.
*/
/** add outgoing graph edge */
void add_feeds (GraphNode* g)
{
_childnodes.insert (g);
}
/** add incoming graph edge */
void add_depends ()
{
++_init_refcount;
_refcount = _init_refcount;
}
private:
/* custom data to process this node.
* Here a sleep/delay in msec.
*/
gulong _work_delay;
protected:
virtual void process ();
private:
void finish ();
void trigger ();
Graph* _graph; ///< parent Graph
std::string _name; ///< node name
/** outgoing edges
* downstream nodes to activate when this node has completed processed */
std::set<GraphNode*> _childnodes;
/* upstream nodes reference count */
gint _refcount; ///< count-down (unprocessed upstream)
gint _init_refcount; ///< number of incoming edges
};
/** Graph -- manage threads, schedule work and synchronize entry/exit */
class Graph {
public:
Graph (unsigned int n_worker_threads)
: _n_workers (n_worker_threads)
, _n_terminal_nodes (0)
, _setup_n_terminal_nodes (0)
{
sem_init (&_callback_start, 0, 0);
sem_init (&_callback_done, 0, 0);
sem_init (&_trigger, 0, 0);
g_atomic_int_set (&_terminal_refcnt, 0);
g_atomic_int_set (&_terminate, 0);
g_atomic_int_set (&_idle_thread_cnt, 0);
g_atomic_int_set (&_trigger_queue_size, 0);
}
~Graph ()
{
sem_destroy (&_callback_start);
sem_destroy (&_callback_done);
sem_destroy (&_trigger);
}
void process_graph ();
bool start ();
void terminate ();
/* setup, create and connect nodes */
GraphNode*
add_node (std::string const& name)
{
_setup_graph_nodes.push_back (GraphNode (this, name));
return &_setup_graph_nodes.back ();
}
GraphNode*
add_terminal_node (std::string const& name)
{
++_setup_n_terminal_nodes;
return add_node (name);
}
GraphNode*
add_initial_node (std::string const& name)
{
GraphNode* n = add_node (name);
_setup_init_trigger_list.push_back (n);
return n;
}
void clear_setup ()
{
_setup_graph_nodes.clear ();
_setup_init_trigger_list.clear ();
_setup_n_terminal_nodes = 0;
}
void rechain ()
{
assert (g_atomic_int_get (&_terminal_refcnt) == 0);
assert (g_atomic_int_get (&_trigger_queue_size) == 0);
_graph_nodes.swap (_setup_graph_nodes);
_init_trigger_list.swap (_setup_init_trigger_list);
_n_terminal_nodes = _setup_n_terminal_nodes;
g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes);
_trigger_queue.reserve (_graph_nodes.size ());
clear_setup ();
}
static void
connect (GraphNode* from, GraphNode* to)
{
from->add_feeds (to);
to->add_depends ();
}
private:
friend class GraphNode;
void trigger (GraphNode* n);
void reached_terminal_node ();
private:
static void*
start_main_thread (void* g)
{
static_cast<Graph*> (g)->main_thread ();
return NULL;
}
static void*
start_worker (void* g)
{
static_cast<Graph*> (g)->worker_thread ();
return NULL;
}
void worker_thread ();
void main_thread ();
pthread_t _main_thread;
std::vector<pthread_t> _worker_threads;
/* number of work-threads to start/started.
* After ::start() this is equivalent to
* _worker_threads.size ()
*/
int _n_workers;
/* flag to terminate background threads */
volatile gint _terminate;
/** List of all graph nodes (only used for memory management) */
std::list<GraphNode> _graph_nodes;
/** Nodes without incoming edges.
* These run concurrently at the start of each cycle to kick off processing */
std::list<GraphNode*> _init_trigger_list;
/** Synchronize with entry point */
sem_t _callback_start;
sem_t _callback_done;
/** Start worker threads */
sem_t _trigger;
/* Terminal node reference count */
volatile gint _terminal_refcnt; ///< remaining unprocessed terminal nodes in this cycle
size_t _n_terminal_nodes; ///< number of graph nodes without an outgoing edge
MPMCQueue<GraphNode*> _trigger_queue; ///< nodes that can be processed
volatile guint _trigger_queue_size; ///< number of entries in trigger-queue
volatile guint _idle_thread_cnt; ///< number of threads waiting for work
/* Chain used to setup in the background.
* This is applied and cleared by ::rechain()
*/
std::list<GraphNode> _setup_graph_nodes;
std::list<GraphNode*> _setup_init_trigger_list;
size_t _setup_n_terminal_nodes;
};
/* **************************
* GraphNode Implementation */
/* Process a node */
void
GraphNode::process ()
{
/* XXX the actual work is to be performed here XXX */
printf ("Thread 0x%zx: process '%s' (%lums)\n", pthread_self (), cname (), _work_delay);
g_usleep (1000 * _work_delay);
}
/* Called by an upstream node, when it has completed processing */
void
GraphNode::trigger ()
{
/* check if we can run */
if (g_atomic_int_dec_and_test (&_refcount)) {
/* reset reference count for next cycle */
g_atomic_int_set (&_refcount, _init_refcount);
/* all nodes that feed this node have completed, so this node be processed now. */
_graph->trigger (this);
}
}
/* Completed processing this node, notify outgoing edges */
void
GraphNode::finish ()
{
bool feeds = false;
/* notify downstream nodes that depend on this node */
for (auto i : _childnodes) {
// printf ("%s activates %s\n", cname(), i->cname());
i->trigger ();
feeds = true;
}
if (!feeds) {
/* terminal node, notify graph */
_graph->reached_terminal_node ();
}
}
/* **********************
* Graph Implementation *
* *********************/
void
Graph::main_thread ()
{
/* Wait until all worker threads are active */
while (g_atomic_int_get (&_idle_thread_cnt) != _n_workers) {
sched_yield ();
}
/* Wait for initial process callback */
sem_wait (&_callback_start);
/* first time setup */
/* Can't run without a graph */
assert (_graph_nodes.size () > 0);
assert (_init_trigger_list.size () > 0);
assert (_n_terminal_nodes > 0);
/* Bootstrap the trigger-list
* (later this is done by Graph_reached_terminal_node) */
for (auto i : _init_trigger_list) {
g_atomic_int_inc (&_trigger_queue_size);
_trigger_queue.push_back (i);
}
/* After setup, the main-thread just becomes a normal worker */
worker_thread ();
}
void
Graph::worker_thread ()
{
for (;;) {
GraphNode* to_run = NULL;
if (g_atomic_int_get (&_terminate)) {
return;
}
if (_trigger_queue.dequeue (to_run)) {
/* Wake up idle threads, but at most as many as there's
* work in the trigger queue that can be processed by
* other threads.
* This thread as not yet decreased _trigger_queue_size.
*/
guint idle_cnt = g_atomic_int_get (&_idle_thread_cnt);
guint work_avail = g_atomic_int_get (&_trigger_queue_size);
guint wakeup = std::min (idle_cnt + 1, work_avail);
for (guint i = 1; i < wakeup; ++i) {
sem_post (&_trigger);
}
}
while (!to_run) {
/* Wait for work, fall asleep */
g_atomic_int_inc (&_idle_thread_cnt);
assert (g_atomic_int_get (&_idle_thread_cnt) <= _n_workers);
sem_wait (&_trigger);
if (g_atomic_int_get (&_terminate)) {
return;
}
g_atomic_int_dec_and_test (&_idle_thread_cnt);
/* Try to find some work to do */
_trigger_queue.dequeue (to_run);
}
/* Process the graph-node */
g_atomic_int_dec_and_test (&_trigger_queue_size);
to_run->run ();
}
}
/* Called from a node when all its incoming edges have
* completed processing and the node can run.
* It is added to the "work queue" */
void
Graph::trigger (GraphNode* n)
{
g_atomic_int_inc (&_trigger_queue_size);
_trigger_queue.push_back (n);
}
/* Called from a terminal node (from the Graph worked-thread)
* to indicate it has completed processing.
*
* The thread of the last terminal node that reaches here
* will inform the main-thread, wait, and kick off the next process cycle.
*/
void
Graph::reached_terminal_node ()
{
if (g_atomic_int_dec_and_test (&_terminal_refcnt)) {
/* All terminal nodes have completed.
* We're done with this cycle.
*/
assert (g_atomic_int_get (&_trigger_queue_size) == 0);
/* Notify caller */
sem_post (&_callback_done);
/* Ensure that all background threads are idle.
* When freewheeling there may be an immediate restart:
* If there are more threads than CPU cores, some worker-
* threads may only be "on the way" to become idle.
*/
while (g_atomic_int_get (&_idle_thread_cnt) != _n_workers) {
sched_yield ();
}
/* Now wait for the next cycle to begin */
sem_wait (&_callback_start);
if (g_atomic_int_get (&_terminate)) {
return;
}
/* Reset terminal reference count */
g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes);
/* and start the initial nodes */
for (auto i : _init_trigger_list) {
g_atomic_int_inc (&_trigger_queue_size);
_trigger_queue.push_back (i);
}
/* .. continue in worker-thread */
}
}
bool
Graph::start ()
{
/* XXX These really should be realtime threads for audio processing XXX
*
* eg. jack_client_create_thread() or setting up priority using
* pthread_attr_setschedpolicy() pthread_attr_setschedparam()
* and/or mach thread_policy_set()
*
* alternatively expose Graph::start_worker(),
* Graph::start_main_thread() to the DAW's backend/engine.
*/
/* Start worker threads */
for (int i = 0; i < _n_workers; ++i) {
pthread_t tid;
if (0 == pthread_create (&tid, NULL, Graph::start_worker, this)) {
_worker_threads.push_back (tid);
}
}
_n_workers = _worker_threads.size ();
/* .. and the main thread */
if (pthread_create (&_main_thread, NULL, Graph::start_main_thread, this)) {
terminate ();
return false;
}
/* Breathe */
sched_yield ();
return true;
}
/* Tell all threads to terminate */
void
Graph::terminate ()
{
/* Flag threads to terminate */
g_atomic_int_set (&_terminate, 1);
/* Wake-up sleeping threads */
int tc = g_atomic_int_get (&_idle_thread_cnt);
assert (tc == _n_workers);
for (int i = 0; i < tc; ++i) {
sem_post (&_trigger);
}
/* and the main thread */
sem_post (&_callback_start);
/* join threads */
for (auto i : _worker_threads) {
pthread_join (i, NULL);
}
pthread_join (_main_thread, NULL);
}
/* The actual entry-point to start processing all nodes,
* and wait for them to complete */
void
Graph::process_graph ()
{
printf (" -- START PROCESS --\n");
const int64_t start = g_get_monotonic_time ();
sem_post (&_callback_start);
sem_wait (&_callback_done);
const int64_t end = g_get_monotonic_time ();
printf (" -- END PROCESS - ELAPSED: %.1fms\n", (end - start) / 1e3f);
}
/* ****************************************************************************/
static void
setup_graph (Graph& g)
{
/* Setup an ordered process graph.
*
* https://en.wikipedia.org/wiki/Directed_acyclic_graph
* https://en.wikipedia.org/wiki/Topological_sorting
*/
/* The GraphNode constructor also need to be changed to have
* a pointer to the node-processing method.
* In this example GraphNode::process() just sleeps to simulate work.
*
* An alternative approach is to subclass
* class YourProcessNode : public GraphNode;
* and overload GraphNode::process();
*/
#if 0
/* Create some example graph
*
* [master]
* ^ ^
* [bus-1] [bus-2]
* ^ ^ ^
* [T1] [T2] [T3]
*
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf
* Fig. 31, Table 7; page 134, 135
*/
GraphNode* master = g.add_terminal_node ("master");
GraphNode* bus2 = g.add_node ("bus2");
GraphNode* bus1 = g.add_node ("bus1");
GraphNode* track3 = g.add_initial_node ("track3");
GraphNode* track2 = g.add_initial_node ("track2");
GraphNode* track1 = g.add_initial_node ("track1");
g.connect (bus1, master);
g.connect (bus2, master);
g.connect (track1, bus1);
g.connect (track2, bus2);
g.connect (track3, bus2);
#else
/* some more complex graph
* [out-1] [out-2]
* / | \ / \
* [L1-1] [L1-2] [L1-3] [L1-4] [L1-5]
* / \ | \ | / \ /
* [L2-1] [L2-2] [L2-3 ] [L2-4]
* | | \ | / \
* [L3-1] [L3-2] [L3-3] [L3-4] [L3-5]
* | / \ | /
* [L4-1] `[L4-2]
*
*/
GraphNode* out1 = g.add_terminal_node ("out-1");
GraphNode* out2 = g.add_terminal_node ("out-2");
GraphNode* l1_1 = g.add_node ("L1-1");
GraphNode* l1_2 = g.add_node ("L1-2");
GraphNode* l1_3 = g.add_node ("L1-3");
GraphNode* l1_4 = g.add_node ("L1-4");
GraphNode* l1_5 = g.add_node ("L1-5");
GraphNode* l2_1 = g.add_node ("L2-1");
GraphNode* l2_2 = g.add_node ("L2-2");
GraphNode* l2_3 = g.add_node ("L2-3");
GraphNode* l2_4 = g.add_node ("L2-4");
GraphNode* l3_1 = g.add_node ("L3-1");
GraphNode* l3_2 = g.add_node ("L3-2");
GraphNode* l3_3 = g.add_node ("L3-3");
GraphNode* l3_4 = g.add_node ("L3-4");
GraphNode* l3_5 = g.add_node ("L3-5");
GraphNode* l4_1 = g.add_initial_node ("L4-1");
GraphNode* l4_2 = g.add_initial_node ("L4-2");
g.connect (l1_1, out1);
g.connect (l1_2, out1);
g.connect (l1_3, out1);
g.connect (l1_4, out2);
g.connect (l1_5, out2);
g.connect (l2_1, l1_1);
g.connect (l2_2, l1_2);
g.connect (l2_3, l1_2);
g.connect (l2_3, l1_3);
g.connect (l2_3, l1_4);
g.connect (l2_4, l1_4);
g.connect (l2_4, l1_5);
g.connect (l3_1, l2_1);
g.connect (l3_2, l2_2);
g.connect (l3_3, l2_2);
g.connect (l3_4, l2_4);
g.connect (l3_5, l2_4);
g.connect (l3_3, l2_3);
g.connect (l4_1, l3_1);
g.connect (l4_1, l3_2);
g.connect (l4_2, l3_3);
g.connect (l4_2, l3_4);
g.connect (l4_2, l3_5);
#endif
/* The graph has a dedicated setup chain.
* When the process-graph changes, a second graph is build in the background,
* while the current graph continues processing.
*
* rechain() activates the setup graph and drops the previous graph.
* It must not be called concurrently with processing.
*
* ie. it can only be called before or after ::process_graph().
*/
g.rechain ();
}
int
main (int argc, char** argv)
{
srand (time (NULL));
Graph g (N_WORKERS);
/* start process threads */
g.start ();
/* Add some nodes to process */
setup_graph (g);
/* Process a few times */
for (int i = 0; i < 5; ++i) {
g.process_graph ();
}
/* Cleanup and quit */
g.terminate ();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment