Skip to content

Instantly share code, notes, and snippets.

@b4n
Last active August 29, 2015 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save b4n/b1135eaaac55224e9c56 to your computer and use it in GitHub Desktop.
Save b4n/b1135eaaac55224e9c56 to your computer and use it in GitHub Desktop.
A shot at the feasibility and implementation complexity of a thread-based asynchornous TM source file updating
/*
* Threaded TM updates.
*
* One worker thread processes parsing jobs.
*
* A job represents the update of one single source file.
* A group represents the user's update request, that can consist of several update jobs.
*
* The job<->group relation is many-to-many: A group can consist of multiple jobs, and a job can be
* shared by several groups. This allows to perform bulk updates as a single job, while allowing
* higher-priority jobs to be processed in-between.
*/
typedef void (*TMUpdateFinishedCallback) (gpointer data);
typedef struct
{
guint jobs_remaning;
/* jobs this group has to update. this may not contain the complete list that
* the job requested if some jobs were already finished by another group */
GPtrArray *jobs;
TMUpdateFinishedCallback callback;
gpointer data;
} TMUpdateGroup;
typedef struct
{
GPtrArray *groups;
gint priority; /* FIXME: GLib's G_PRIORITY_... is smallest-is-highest. it's weird, but
supporting those names would probably be handy. Or define ours */
TMSourceFile *source_file; /* FIXME: this has to be read-only */
gboolean buffer_update;
guchar *buffer;
gsize buffer_length;
GPtrArray *tags_array; /*< parsed tags as a result of the job */
} TMUpdateJob;
typedef struct
{
GMutex mutex;
GCond cond;
GQueue queue;
} TMJobQueue;
static TMJobQueue *TM_job_queue = NULL;
static GThread *TM_job_thread = NULL;
static TMUpdateJob *update_job_new(gint priority, TMSourceFile *source_file)
{
TMUpdateJob *job = g_malloc(sizeof *job);
job->groups = g_ptr_array_new();
job->priority = priority;
job->source_file = tm_source_file_ref(source_file);
job->buffer_update = FALSE;
job->buffer = NULL;
job->buffer_length = 0;
job->tags_array = NULL;
return job;
}
static void update_job_free(TMUpdateJob *job)
{
guint i;
for (i = 0; i < job->groups->len; i++)
{
if (job->groups->pdata[i])
g_warning("Destroying a job with remaining groups");
}
g_ptr_array_free(job->groups, TRUE);
tm_source_file_unref(job->source_file);
g_free(job->buffer);
g_free(job);
}
static TMUpdateGroup *update_group_new(TMUpdateFinishedCallback callback, gpointer data)
{
TMUpdateGroup *group = g_malloc(sizeof *group);
group->jobs_remaning = 0;
group->jobs = g_ptr_array_new();
group->callback = callback;
group->data = data;
return group;
}
static void update_group_free(TMUpdateGroup *group)
{
guint i;
for (i = 0; i < group->jobs->len; i++)
update_job_free(group->jobs->pdata[i]);
g_ptr_array_free(group->jobs, TRUE);
g_free(group);
}
static TMUpdateJob *job_queue_pop(TMJobQueue *queue)
{
TMUpdateJob *job = NULL;
g_mutex_lock(&queue->mutex);
while (! g_queue_peek_tail_link(&queue->queue))
g_cond_wait(&queue->cond, &queue->mutex);
job = g_queue_pop_tail(&queue->queue);
g_mutex_unlock(queue);
return job;
}
/* inserts a new job to the queue, possibly replacing an existing job */
static void job_queue_push(TMJobQueue *queue, TMUpdateJob *new_job)
{
GList *node;
g_mutex_lock(&queue->queue);
/* find and remove duplicate jobs */
for (node = queue->queue.head; node; node = node->next)
{
TMUpdateJob *job = node->data;
if (job->source_file == new_job->source_file ||
strcmp(job->source_file->file_name, new_job->source_file->file_name) == 0)
{
/* inherit the old job's groups */
g_ptr_array_set_size(new_job->groups, new_job->groups->len + job->groups->len);
memcpy(&new_job->groups->pdata[new_job->groups->len], job->groups->pdata, job->groups->len);
memset(job->groups->pdata, 0, job->groups->len);
/* and possibly its priority */
new_job->priority = MAX(new_job->priority, job->priority);
update_job_free(job);
g_queue_delete_link(&queue->queue, node);
break; /* there can only be one duplicate, so stop here */
}
}
/* insert the new job */
for (node = queue->queue.head; node; node = node->next)
{
TMUpdateJob *job = node->data;
if (new_job->priority < job->priority)
break;
}
/* if we found a node with priority >= to the new job, insert the new job before it.
* otherwise, the queue is either empty or has only lower priority jobs, so push to the end. */
if (node)
g_queue_insert_before(&queue->queue, node, new_job);
else
g_queue_push_tail(&queue->queue, new_job);
g_cond_signal(&queue->cond);
g_mutex_unlock(&queue->queue);
}
static gboolean finish_job_in_idle(gpointer data)
{
TMUpdateGroup *group = data;
guint i;
/* FIXME: properly handle the case a source file has been removed from the workspace
* before we get here, e.g. we shouldn't merge its tags or update for it */
for (i = 0; i < group->jobs->len; i++)
{
TMUpdateJob *job = group->jobs->pdata[i];
tm_tags_remove_file_tags(job->source_file, theWorkspace->tags_array);
job->source_file->tags_array = job->tags_array;
if (group->jobs->len == 1)
tm_workspace_merge_tags(&theWorkspace->tags_array, job->source_file->tags_array);
}
if (group->jobs->len > 1)
tm_workspace_recreate_tags_array();
if (group->callback)
group->callback(group->data);
update_group_free(group);
return FALSE;
}
static gpointer update_thread(TMJobQueue *queue)
{
TMUpdateJob *job;
while ((job = job_queue_pop(queue)) != NULL)
{
guint i;
gboolean job_finished = FALSE;
if (job->buffer_update)
job->tags_array = tm_source_file_buffer_parse_to_array(job->source_file,
job->buffer, job->buffer_length);
else
job->tags_array = tm_source_file_parse_to_array(job->source_file);
tm_tags_sort(job->tags_array, NULL, FALSE, TRUE);
for (i = 0; i < job->groups->len; i++)
{
TMUpdateGroup *group = job->groups->pdata[i];
/* add the job to the result jobs of the groups, it will be removed
* later when a group handled it */
if (group && !job_finished)
g_ptr_array_add(group->jobs, job);
if (group && --(group->jobs_remaning) < 1)
{
job->groups->pdata[i] = NULL;
g_idle_add(finish_job_in_idle, group);
job_finished = TRUE;
}
}
/* if the job was finished by a group, remove it from the other groups */
if (job_finished)
{
for (i = 0; i < job->groups->len; i++)
{
TMUpdateGroup *group = job->groups->pdata[i];
if (group)
g_ptr_array_remove_fast(group->jobs, job);
}
}
}
return NULL;
}
static void update_thread_init(void)
{
TM_job_thread = g_thread_create(update_thread, TM_job_queue, TRUE, NULL);
}
static void update_thread_destroy(void)
{
GList *node;
/* FIXME: while this is nice to avoid blocking on useless remaining jobs when quitting, is it
* really OK to throw away jobs and the forget some callbacks?
* It probably is, as I doubt the main loop will keep running until there are no more
* GSources, so anyway the callback may not run even if submitted when we quit.
* NOTE: this is done the nice way. If we really don't care about the jobs or anything, we
* could just let the thread die with the process. */
/* empty the queue and add a NULL job to notify the thread */
g_mutex_lock(&TM_job_queue->lock);
for (node = TM_job_queue->queue.head; node; node = node->next)
{
TMUpdateJob *job = node->data;
guint i;
for (i = 0; i < job->groups->len; i++)
{
TMUpdateGroup *group = job->groups->pdata[i];
if (group && --(group->jobs_remaning) < 1)
update_group_free(group);
}
update_job_free(job);
}
g_queue_clear(&TM_job_queue->queue);
/* push a NULL job to notify the thread */
g_queue_push_tail(&TM_job_queue->queue, NULL);
g_cond_signal(&TM_job_queue->cond);
g_mutex_unlock(&TM_job_queue->lock);
g_thread_join(TM_job_thread);
TM_job_thread = NULL;
}
void tm_workspace_update_source_file_async(TMSourceFile *source_file, gboolean update_workspace,
gint priority, TMUpdateFinishedCallback *callback, gpointer data)
{
TMUpdateJob *job = update_job_new(priority, source_file);
TMUpdateJob *group = update_group_new(callback, data);
job->buffer_update = FALSE;
g_ptr_array_add(job->groups, group);
group->jobs_remaning = 1;
job_queue_push(TM_job_queue, job);
}
/* like tm_workspace_update_source_file_buffer_async() but takes ownership of the buffer, to avoid
* memory duplication. The buffer must be valid to free with g_free(). */
void tm_workspace_update_source_file_buffer_steal_async(TMSourceFile *source_file, guchar *buffer, gsize len,
gboolean update_workspace, gint priority,
TMUpdateFinishedCallback *callback, gpointer data)
{
TMUpdateJob *job = update_job_new(priority, source_file);
TMUpdateJob *group = update_group_new(callback, data);
job->buffer_update = TRUE;
job->buffer = buffer;
job->buffer_length = len;
g_ptr_array_add(job->groups, group);
group->jobs_remaning = 1;
job_queue_push(TM_job_queue, job);
}
void tm_workspace_update_source_file_buffer_async(TMSourceFile *source_file, const guchar *buffer, gsize len,
gboolean update_workspace, gint priority,
TMUpdateFinishedCallback *callback, gpointer data)
{
tm_workspace_update_source_file_buffer_steal_async(source_file, g_memdup(buffer, len), len,
update_workspace, priority, callback, data);
}
void tm_workspace_update_source_files_async(GPtrArray *source_files, gboolean update_workspace,
gint priority, TMUpdateFinishedCallback *callback, gpointer data)
{
TMUpdateJob *group = update_group_new(callback, data);
guint i;
/* as we push each new job directly in the queue without a surrounding lock,
* we need not to modify the group in the loop */
/* FIXME: should this manually lock the queue, push all jobs and then notify the condition?
* it would be a little more optimized, but would probably require duplicating a part of
* job_queue_push() which is a non-trivial function */
group->jobs_remaning = source_files->len;
for (i = 0; i < source_files->len; i++)
{
TMUpdateJob *job = update_job_new(priority, source_files->pdata[i]);
job->buffer_update = FALSE;
g_ptr_array_add(job->groups, group);
job_queue_push(TM_job_queue, job);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment