Last active
August 28, 2016 22:18
-
-
Save JasonLG1979/7ee002df45b08374c8c6e146f22211c9 to your computer and use it in GitHub Desktop.
An asynchronous cancellable hybrid priority/fifo queued thread worker decorator(that's a mouthfull,lol!!!)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright (C) 2016 Jason Gray <jasonlevigray3@gmail.com> | |
# | |
#This program is free software: you can redistribute it and/or modify it | |
#under the terms of the GNU General Public License version 3, as published | |
#by the Free Software Foundation. | |
# | |
#This program is distributed in the hope that it will be useful, but | |
#WITHOUT ANY WARRANTY; without even the implied warranties of | |
#MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | |
#PURPOSE. See the GNU General Public License for more details. | |
# | |
#You should have received a copy of the GNU General Public License along | |
#with this program. If not, see <http://www.gnu.org/licenses/>. | |
### END LICENSE | |
import threading | |
import queue | |
from gi.repository import GLib, Gio | |
__all__ = ['GLib_async_queue'] | |
class Result: | |
def __init__(self, condition, result): | |
self._result = result | |
self._condition = condition | |
def get(self): | |
return self._result | |
def release(self): | |
with self._condition: | |
self._condition.notify() | |
return self._result | |
class Worker(threading.Thread): | |
def __init__(self): | |
super().__init__() | |
self.queue = queue.PriorityQueue() | |
self.fifo_priority = 0 | |
self.daemon = True | |
self.start() | |
def run(self): | |
while True: | |
priority, _, f, args, kwargs, on_done, cancellable = self.queue.get() | |
if not cancellable.is_cancelled(): | |
result = f(*args, **kwargs) | |
if on_done: | |
condition = threading.Condition() | |
GLib.idle_add(on_done, Result(condition, result), priority=priority) | |
with condition: | |
condition.wait() | |
self.queue.task_done() | |
def queue_task(self, priority, f, args, kwargs, on_done, cancellable): | |
self.fifo_priority += 1 | |
self.queue.put_nowait((priority, self.fifo_priority, f, args, kwargs, on_done, cancellable)) | |
worker = Worker() | |
def GLib_async_queue(on_done=None, priority=GLib.PRIORITY_DEFAULT_IDLE): | |
def wrapper(f): | |
def run(*args, **kwargs): | |
cancellable = Gio.Cancellable() | |
worker.queue_task(priority, f, args, kwargs, on_done, cancellable) | |
return cancellable | |
return run | |
return wrapper | |
# # What it is # | |
# An asynchronous cancellable hybrid priority/fifo queued thread worker decorator(that's a mouthfull,lol!!!) | |
# A single worker thread that runs asynchronous to the main thread that allows execution in order | |
# of priority or FIFO. In addition any queued task that has not yet started can be cancelled. | |
# | |
# # How it works # | |
# Tasks are queued with an optional "on_done" callback and "priority". If no priority is given the default | |
# GLib.PRIORITY_DEFAULT_IDLE will be used. | |
# | |
# The tasks will be executed in order of priority, tasks of the same priority will be executed in FIFO order. | |
# The queued task will return a Gio.Cancellable that can be used to cancel the task at any time before it starts. | |
# | |
# If an "on_done" callback is provided the results of the work done in the worker thread will be passed to the | |
# "on_done" callback. A threading.Condition then puts the worker thread to sleep if an "on_done" callback is provided | |
# to allow for the cancellation of other queued tasks if desired. The results can be retrieved by calling | |
# result.get() or result.release() get returns the actual results and release returns the actual results and wakes the | |
# the worker thread back up. get is usefull if you would like to evaluate the results to decide if you would like to cancel | |
# other tasks otherwise if you don't care to cancel any tasks just use result.release(). | |
# | |
# # Non-working example code # | |
# | |
# class DoesntWork: | |
# def __init__(self): | |
# self.task_b = None | |
# self.async_a() | |
# self.async_b() | |
# self.async_c() | |
# | |
# def async_a(self): | |
# def on_done(result): | |
# a = result.get()#get the results without waking the worker thread up yet | |
# if a is not None: | |
# self.task_b.cancel()#cancel a task while the worker thread is asleep | |
# result.release()#wake the worker thread back up | |
# #do cool stuff with a | |
# | |
# @GLib_async_queue(on_done=on_done) | |
# def async(): | |
# try: | |
# #stuff that might fail | |
# except: | |
# return None | |
# | |
# async()#We don't need the return value. | |
# | |
# def async_b(self): | |
# def on_done(result): | |
# b = result.release()#wake the worker thread back up and return the results | |
# #do cool stuff with b | |
# | |
# @GLib_async_queue(on_done=on_done) | |
# def async(): | |
# #do stuff | |
# | |
# self.task_b = async()#We need the return value. | |
# | |
# @GLib_async_queue() | |
# def async_c(self): | |
# #do stuff in the worker thread that we don't care what the results are |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment