Skip to content

Instantly share code, notes, and snippets.

@JasonLG1979
Last active August 28, 2016 22:18
Show Gist options
  • Save JasonLG1979/7ee002df45b08374c8c6e146f22211c9 to your computer and use it in GitHub Desktop.
Save JasonLG1979/7ee002df45b08374c8c6e146f22211c9 to your computer and use it in GitHub Desktop.
An asynchronous cancellable hybrid priority/fifo queued thread worker decorator(that's a mouthfull,lol!!!)
#
# Copyright (C) 2016 Jason Gray <jasonlevigray3@gmail.com>
#
#This program is free software: you can redistribute it and/or modify it
#under the terms of the GNU General Public License version 3, as published
#by the Free Software Foundation.
#
#This program is distributed in the hope that it will be useful, but
#WITHOUT ANY WARRANTY; without even the implied warranties of
#MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
#PURPOSE. See the GNU General Public License for more details.
#
#You should have received a copy of the GNU General Public License along
#with this program. If not, see <http://www.gnu.org/licenses/>.
### END LICENSE
import threading
import queue
from gi.repository import GLib, Gio
__all__ = ['GLib_async_queue']
class Result:
def __init__(self, condition, result):
self._result = result
self._condition = condition
def get(self):
return self._result
def release(self):
with self._condition:
self._condition.notify()
return self._result
class Worker(threading.Thread):
def __init__(self):
super().__init__()
self.queue = queue.PriorityQueue()
self.fifo_priority = 0
self.daemon = True
self.start()
def run(self):
while True:
priority, _, f, args, kwargs, on_done, cancellable = self.queue.get()
if not cancellable.is_cancelled():
result = f(*args, **kwargs)
if on_done:
condition = threading.Condition()
GLib.idle_add(on_done, Result(condition, result), priority=priority)
with condition:
condition.wait()
self.queue.task_done()
def queue_task(self, priority, f, args, kwargs, on_done, cancellable):
self.fifo_priority += 1
self.queue.put_nowait((priority, self.fifo_priority, f, args, kwargs, on_done, cancellable))
worker = Worker()
def GLib_async_queue(on_done=None, priority=GLib.PRIORITY_DEFAULT_IDLE):
def wrapper(f):
def run(*args, **kwargs):
cancellable = Gio.Cancellable()
worker.queue_task(priority, f, args, kwargs, on_done, cancellable)
return cancellable
return run
return wrapper
# # What it is #
# An asynchronous cancellable hybrid priority/fifo queued thread worker decorator(that's a mouthfull,lol!!!)
# A single worker thread that runs asynchronous to the main thread that allows execution in order
# of priority or FIFO. In addition any queued task that has not yet started can be cancelled.
#
# # How it works #
# Tasks are queued with an optional "on_done" callback and "priority". If no priority is given the default
# GLib.PRIORITY_DEFAULT_IDLE will be used.
#
# The tasks will be executed in order of priority, tasks of the same priority will be executed in FIFO order.
# The queued task will return a Gio.Cancellable that can be used to cancel the task at any time before it starts.
#
# If an "on_done" callback is provided the results of the work done in the worker thread will be passed to the
# "on_done" callback. A threading.Condition then puts the worker thread to sleep if an "on_done" callback is provided
# to allow for the cancellation of other queued tasks if desired. The results can be retrieved by calling
# result.get() or result.release() get returns the actual results and release returns the actual results and wakes the
# the worker thread back up. get is usefull if you would like to evaluate the results to decide if you would like to cancel
# other tasks otherwise if you don't care to cancel any tasks just use result.release().
#
# # Non-working example code #
#
# class DoesntWork:
# def __init__(self):
# self.task_b = None
# self.async_a()
# self.async_b()
# self.async_c()
#
# def async_a(self):
# def on_done(result):
# a = result.get()#get the results without waking the worker thread up yet
# if a is not None:
# self.task_b.cancel()#cancel a task while the worker thread is asleep
# result.release()#wake the worker thread back up
# #do cool stuff with a
#
# @GLib_async_queue(on_done=on_done)
# def async():
# try:
# #stuff that might fail
# except:
# return None
#
# async()#We don't need the return value.
#
# def async_b(self):
# def on_done(result):
# b = result.release()#wake the worker thread back up and return the results
# #do cool stuff with b
#
# @GLib_async_queue(on_done=on_done)
# def async():
# #do stuff
#
# self.task_b = async()#We need the return value.
#
# @GLib_async_queue()
# def async_c(self):
# #do stuff in the worker thread that we don't care what the results are
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment