Skip to content

Instantly share code, notes, and snippets.

@x42
Created June 29, 2019 13:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save x42/408cb5468914c7c3345f7071055d7502 to your computer and use it in GitHub Desktop.
Save x42/408cb5468914c7c3345f7071055d7502 to your computer and use it in GitHub Desktop.
// gcc `pkg-config --cflags --libs glib-2.0` -pthread semproc.c -o semproc
/* Example to parallel progress a directed acyclic graph as described in
* Chapter 17.3 of http://www.theses.fr/2017PA080116, page 131-135
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf
*
* (C) 2017, 2019 Robin Gareus <robin@gareus.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#define N_WORKERS (3)
#include <assert.h>
#include <glib.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
struct _Graph;
/** A Process Graph Node to process */
typedef struct _GraphNode {
struct _Graph* graph; ///< parent Graph
char* name; ///< node name
/** outgoing edges
* downstream nodes to activate when this node has completed processed
*/
struct _GraphNode** childnodes;
size_t n_childnodes;
/* upstream nodes reference count */
gint init_refcount; ///< number of incoming edges
volatile gint refcount; ///< count-down (unprocessed upstream)
} GraphNode;
/** Graph -- manage threads, schedule work and synchronize entry/exit */
typedef struct _Graph {
/** List of all graph nodes (only used for memory management) */
GraphNode** graph_nodes;
size_t n_graph_nodes;
/** Nodes without incoming edges.
* These run concurrently at the start of each cycle to kick off processing */
GraphNode** init_trigger_list;
size_t n_init_triggers;
/* terminal node reference count */
gint terminal_node_count; ///< number of graph nodes without an outgoing edge
volatile gint terminal_refcnt; ///< remaining unprocessed terminal nodes in this cycle
/* Synchronization with main process callback */
sem_t callback_start;
sem_t callback_done;
/* wake up graph node process threads */
sem_t trigger;
volatile bool terminate; ///< flag to exit, terminate all process-threads
/* these following are protected by _trigger_mutex */
pthread_mutex_t trigger_mutex;
volatile int idle_thread_cnt; ///< number of threads waiting for work
GraphNode** trigger_queue; ///< nodes that can be processed
size_t n_trigger_queue;
size_t trigger_queue_size; ///< max size (pre-allocated array)
} Graph;
/* prototypes */
void Graph_trigger (Graph* self, GraphNode* n);
void Graph_reached_terminal_node (Graph* self);
/* **************************
* GraphNode Implementation *
* *************************/
GraphNode*
GraphNode_new (Graph* g, char const* name)
{
GraphNode* self = (GraphNode*)calloc (1, sizeof (GraphNode));
self->graph = g;
self->init_refcount = 0;
self->refcount = 0;
self->childnodes = NULL;
self->n_childnodes = 0;
self->name = strdup (name);
return self;
}
void
GraphNode_free (GraphNode* self)
{
free (self->childnodes);
free (self->name);
free (self);
}
/* called by an upstream node, when it has completed processing */
void
GraphNode_trigger (GraphNode* self)
{
/* check if we can run */
if (g_atomic_int_dec_and_test (&self->refcount)) {
// printf ("* schedule '%s'\n", self->name);
/* reset reference count for next cycle */
g_atomic_int_set (&self->refcount, self->init_refcount);
/* all nodes that feed this node have completed, so this node be processed now. */
Graph_trigger (self->graph, self);
}
}
/* completed processing this node, notify outgoing edges */
void
GraphNode_finish (GraphNode* self)
{
bool feeds = false;
/* notify downstream nodes that depend on this node */
for (size_t i = 0; i < self->n_childnodes; ++i) {
// printf ("- %s done activates %s\n", self->name, self->childnodes[i]->name);
GraphNode_trigger (self->childnodes[i]);
feeds = true;
}
/* if there are no outgoing edges, this is a terminal node */
if (!feeds) {
/* notify parent graph */
Graph_reached_terminal_node (self->graph);
}
}
/* fake "Process Node" - sleep some random time 1..10ms */
void
GraphNode_process (GraphNode const* self)
{
int dly = 1 + rand () % 10;
printf ("Thread 0x%zx: process '%s' (%dms)\n", pthread_self (), self->name, dly);
g_usleep (1000 * dly);
}
void
GraphNode_run (GraphNode* self)
{
GraphNode_process (self);
GraphNode_finish (self);
}
/* setup method to add outgoing graph edges.
* Must be called during initialization only */
void
GraphNode_add_feeds (GraphNode* self, GraphNode* g)
{
for (size_t i = 0; i < self->n_childnodes; ++i) {
if (self->childnodes[i] == g) {
return;
}
}
self->childnodes = (GraphNode**)realloc (self->childnodes, (1 + self->n_childnodes) * sizeof (GraphNode*));
self->childnodes[self->n_childnodes++] = g;
}
/* setup method, to add incoming graph edges
* Must be called during initialization only */
void
GraphNode_add_depends (GraphNode* self)
{
++self->init_refcount;
self->refcount = self->init_refcount;
}
/* **********************
* Graph Implementation *
* *********************/
void
Graph_init (Graph* self)
{
sem_init (&self->callback_start, 0, 0);
sem_init (&self->callback_done, 0, 0);
sem_init (&self->trigger, 0, 0);
pthread_mutex_init (&self->trigger_mutex, NULL);
self->terminate = false;
self->idle_thread_cnt = 0;
self->terminal_refcnt = 0;
self->terminal_node_count = 0;
self->graph_nodes = NULL;
self->n_graph_nodes = 0;
self->init_trigger_list = NULL;
self->n_init_triggers = 0;
self->n_trigger_queue = 0;
self->trigger_queue_size = 0;
self->trigger_queue = NULL;
}
void
Graph_free (Graph* self)
{
for (size_t n = 0; n < self->n_graph_nodes; ++n) {
GraphNode_free (self->graph_nodes[n]);
}
free (self->graph_nodes);
pthread_mutex_destroy (&self->trigger_mutex);
sem_destroy (&self->callback_start);
sem_destroy (&self->callback_done);
sem_destroy (&self->trigger);
}
#ifndef MIN
#define MIN(A, B) (((A) < (B)) ? (A) : (B))
#endif
void*
Graph_worker_thread (void* g)
{
Graph* self = (Graph*)g;
do {
GraphNode* to_run = NULL;
pthread_mutex_lock (&self->trigger_mutex);
if (self->terminate) {
pthread_mutex_unlock (&self->trigger_mutex);
return 0;
}
if (self->n_trigger_queue > 0) {
to_run = self->trigger_queue[--self->n_trigger_queue];
}
int wakeup = MIN (self->idle_thread_cnt, self->n_trigger_queue);
/* wake up threads */
self->idle_thread_cnt -= wakeup;
for (int i = 0; i < wakeup; i++) {
sem_post (&self->trigger);
}
while (!to_run) {
/* wait for work, fall asleep */
self->idle_thread_cnt += 1;
assert (self->idle_thread_cnt <= N_WORKERS);
pthread_mutex_unlock (&self->trigger_mutex);
sem_wait (&self->trigger);
if (self->terminate) {
return 0;
}
/* try to find some work to do */
pthread_mutex_lock (&self->trigger_mutex);
if (self->n_trigger_queue > 0) {
to_run = self->trigger_queue[--self->n_trigger_queue];
}
}
pthread_mutex_unlock (&self->trigger_mutex);
/* process graph-node */
GraphNode_run (to_run);
} while (!self->terminate);
return 0;
}
void*
Graph_main_thread (void* g)
{
Graph* self = (Graph*)g;
/* wait for initial process callback */
sem_wait (&self->callback_start);
pthread_mutex_lock (&self->trigger_mutex);
/* Can't run without a graph */
assert (self->n_graph_nodes > 0);
assert (self->n_init_triggers > 0);
assert (self->terminal_node_count > 0);
/* bootstrap trigger-list.
* (later this is done by Graph_reached_terminal_node)*/
for (size_t i = 0; i < self->n_init_triggers; ++i) {
assert (self->n_trigger_queue < self->trigger_queue_size);
self->trigger_queue[self->n_trigger_queue++] = self->init_trigger_list[i];
}
pthread_mutex_unlock (&self->trigger_mutex);
/* after setup, the main-thread just becomes a normal worker */
return Graph_worker_thread (g);
}
/* called from a node when all its incoming edges have
* completed processing and the node can run.
* It is added to the "work queue" */
void
Graph_trigger (Graph* self, GraphNode* n)
{
pthread_mutex_lock (&self->trigger_mutex);
assert (self->n_trigger_queue < self->trigger_queue_size);
self->trigger_queue[self->n_trigger_queue++] = n;
pthread_mutex_unlock (&self->trigger_mutex);
}
/* called from a terminal node (from the Graph worked-thread)
* to indicate it has completed processing.
*
* The thread of the last terminal node that reaches here
* will inform the main-thread, wait, and kick off the next process cycle.
*/
void
Graph_reached_terminal_node (Graph* self)
{
if (g_atomic_int_dec_and_test (&self->terminal_refcnt)) {
/* all terminal nodes have completed,
* we're done with this cycle.
*/
sem_post (&self->callback_done);
/* now wait for the next cycle to begin */
sem_wait (&self->callback_start);
if (self->terminate) {
return;
}
/* reset terminal reference count */
g_atomic_int_set (&self->terminal_refcnt, self->terminal_node_count);
/* and start the initial nodes */
pthread_mutex_lock (&self->trigger_mutex);
for (size_t i = 0; i < self->n_init_triggers; ++i) {
assert (self->n_trigger_queue < self->trigger_queue_size);
self->trigger_queue[self->n_trigger_queue++] = self->init_trigger_list[i];
}
pthread_mutex_unlock (&self->trigger_mutex);
/* continue in worker-thread */
}
}
/* tell all threads to terminate */
void
Graph_terminate (Graph* self)
{
pthread_mutex_lock (&self->trigger_mutex);
self->terminate = true;
self->init_trigger_list = NULL;
self->n_init_triggers = 0;
self->trigger_queue = NULL;
self->n_trigger_queue = 0;
free (self->init_trigger_list);
free (self->trigger_queue);
int tc = self->idle_thread_cnt;
assert (tc == N_WORKERS);
pthread_mutex_unlock (&self->trigger_mutex);
/* wake-up sleeping threads */
for (int i = 0; i < tc; ++i) {
sem_post (&self->trigger);
}
/* and the main thread */
pthread_mutex_lock (&self->trigger_mutex);
sem_post (&self->callback_start);
pthread_mutex_unlock (&self->trigger_mutex);
}
/* the actual entry-point to start processing all nodes
* and wait for them to complete */
void
Graph_process_graph (Graph* self)
{
printf (" -- START PROCESS --\n");
const int64_t start = g_get_monotonic_time ();
sem_post (&self->callback_start);
sem_wait (&self->callback_done);
const int64_t end = g_get_monotonic_time ();
printf (" -- END PROCESS - ELAPSED: %.1fms\n", (end - start) / 1e3f);
}
/* ************************************************
* setup, create and connect nodes
* These must only be called during initialization
* ***********************************************/
GraphNode*
Graph_add_node (Graph* self, const char* name)
{
free (self->trigger_queue);
self->trigger_queue = (GraphNode**)malloc (++self->trigger_queue_size * sizeof (GraphNode*));
self->graph_nodes = (GraphNode**)realloc (self->graph_nodes, (1 + self->n_graph_nodes) * sizeof (GraphNode*));
self->graph_nodes[self->n_graph_nodes] = GraphNode_new (self, name);
return self->graph_nodes[self->n_graph_nodes++];
}
GraphNode*
Graph_add_terminal_node (Graph* self, const char* name)
{
++self->terminal_node_count;
self->terminal_refcnt = self->terminal_node_count;
return Graph_add_node (self, name);
}
GraphNode*
Graph_add_initial_node (Graph* self, const char* name)
{
self->init_trigger_list = (GraphNode**)realloc (self->init_trigger_list, (1 + self->n_init_triggers) * sizeof (GraphNode*));
self->init_trigger_list[self->n_init_triggers] = Graph_add_node (self, name);
return self->init_trigger_list[self->n_init_triggers++];
}
static void
connect (GraphNode* from, GraphNode* to)
{
GraphNode_add_feeds (from, to);
GraphNode_add_depends (to);
}
/* ****************************************************************************/
static void
setup_graph (Graph* g)
{
#if 1
/* Create some example graph
*
* [master-out]
* ^ ^
* [bus-1] [bus-2]
* ^ ^ ^
* [T1] [T2] [T3]
*
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf
* Fig. 31, Table 7; page 134, 135
*/
GraphNode* master = Graph_add_terminal_node (g, "master");
GraphNode* bus2 = Graph_add_node (g, "bus2");
GraphNode* bus1 = Graph_add_node (g, "bus1");
GraphNode* track3 = Graph_add_initial_node (g, "track3");
GraphNode* track2 = Graph_add_initial_node (g, "track2");
GraphNode* track1 = Graph_add_initial_node (g, "track1");
connect (bus1, master);
connect (bus2, master);
connect (track1, bus1);
connect (track2, bus2);
connect (track3, bus2);
#else
/* some more complex graph
* [out-1] [out-2]
* / | \ / \
* [L1-1] [L1-2] [L1-3] [L1-4] [L1-5]
* / \ | \ | / \ /
* [L2-1] [L2-2] [L2-3 ] [L2-4]
* | | \ | / \
* [L3-1] [L3-2] [L3-3] [L3-4] [L3-5]
*
*/
GraphNode* out1 = Graph_add_terminal_node (g, "out-1");
GraphNode* out2 = Graph_add_terminal_node (g, "out-2");
GraphNode* l1_1 = Graph_add_node (g, "L1-1");
GraphNode* l1_2 = Graph_add_node (g, "L1-2");
GraphNode* l1_3 = Graph_add_node (g, "L1-3");
GraphNode* l1_4 = Graph_add_node (g, "L1-4");
GraphNode* l1_5 = Graph_add_node (g, "L1-5");
GraphNode* l2_1 = Graph_add_node (g, "L2-1");
GraphNode* l2_2 = Graph_add_node (g, "L2-2");
GraphNode* l2_3 = Graph_add_node (g, "L2-3");
GraphNode* l2_4 = Graph_add_node (g, "L2-4");
GraphNode* l3_1 = Graph_add_initial_node (g, "L3-1");
GraphNode* l3_2 = Graph_add_initial_node (g, "L3-2");
GraphNode* l3_3 = Graph_add_initial_node (g, "L3-3");
GraphNode* l3_4 = Graph_add_initial_node (g, "L3-4");
GraphNode* l3_5 = Graph_add_initial_node (g, "L3-5");
connect (l1_1, out1);
connect (l1_2, out1);
connect (l1_3, out1);
connect (l1_4, out2);
connect (l1_5, out2);
connect (l2_1, l1_1);
connect (l2_2, l1_2);
connect (l2_3, l1_2);
connect (l2_3, l1_3);
connect (l2_3, l1_4);
connect (l2_4, l1_4);
connect (l2_4, l1_5);
connect (l3_1, l2_1);
connect (l3_2, l2_2);
connect (l3_3, l2_2);
connect (l3_4, l2_4);
connect (l3_5, l2_4);
connect (l3_3, l2_3);
#endif
}
int
main (int argc, char** argv)
{
/* Number of worker threads to start (in addition to main thread). Can be 0) */
srand (time (NULL));
Graph g;
pthread_t main_thread;
pthread_t workers[N_WORKERS];
/* initialize graph, and add some nodes to process */
Graph_init (&g);
setup_graph (&g);
/* create worker threads */
for (int i = 0; i < N_WORKERS; ++i) {
pthread_create (&workers[i], NULL, &Graph_worker_thread, &g);
}
/* and the main thread */
pthread_create (&main_thread, NULL, &Graph_main_thread, &g);
/* breathe */
sched_yield ();
/* process a few times */
for (int i = 0; i < 5; ++i) {
Graph_process_graph (&g);
}
/* cleanup and quit */
Graph_terminate (&g);
for (int i = 0; i < N_WORKERS; ++i) {
pthread_join (workers[i], NULL);
}
pthread_join (main_thread, NULL);
Graph_free (&g);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment