Last active
August 28, 2016 23:51
-
-
Save JasonLG1979/8b90a75919ce3b5fb63788b62dae0845 to your computer and use it in GitHub Desktop.
Working GLib_async_queue example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright (C) 2016 Jason Gray <jasonlevigray3@gmail.com> | |
# | |
#This program is free software: you can redistribute it and/or modify it | |
#under the terms of the GNU General Public License version 3, as published | |
#by the Free Software Foundation. | |
# | |
#This program is distributed in the hope that it will be useful, but | |
#WITHOUT ANY WARRANTY; without even the implied warranties of | |
#MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | |
#PURPOSE. See the GNU General Public License for more details. | |
# | |
#You should have received a copy of the GNU General Public License along | |
#with this program. If not, see <http://www.gnu.org/licenses/>. | |
### END LICENSE | |
import threading | |
import queue | |
import time | |
import random | |
import gi | |
gi.require_version('Gtk', '3.0') | |
from gi.repository import Gtk, GLib, Gio | |
class Result: | |
def __init__(self, condition, result): | |
self._result = result | |
self._condition = condition | |
def get(self): | |
return self._result | |
def release(self): | |
with self._condition: | |
self._condition.notify() | |
return self._result | |
class Worker(threading.Thread): | |
def __init__(self): | |
super().__init__() | |
self.queue = queue.PriorityQueue() | |
self.fifo_priority = 0 | |
self.daemon = True | |
self.start() | |
def run(self): | |
while True: | |
priority, _, f, args, kwargs, on_done, cancellable = self.queue.get() | |
if not cancellable.is_cancelled(): | |
result = f(*args, **kwargs) | |
if on_done: | |
condition = threading.Condition() | |
GLib.idle_add(on_done, Result(condition, result), priority=priority) | |
with condition: | |
condition.wait() | |
self.queue.task_done() | |
def queue_task(self, priority, f, args, kwargs, on_done, cancellable): | |
self.fifo_priority += 1 | |
self.queue.put_nowait((priority, self.fifo_priority, f, args, kwargs, on_done, cancellable)) | |
worker = Worker() | |
def GLib_async_queue(on_done=None, priority=GLib.PRIORITY_DEFAULT_IDLE): | |
def wrapper(f): | |
def run(*args, **kwargs): | |
cancellable = Gio.Cancellable() | |
worker.queue_task(priority, f, args, kwargs, on_done, cancellable) | |
return cancellable | |
return run | |
return wrapper | |
class QueueTest(Gtk.Window): | |
def __init__(self): | |
Gtk.Window.__init__(self, title='Queue Test') | |
self.time_sec = 0 | |
self.start_time = None | |
self.should_have_took = 0.0 | |
self.canceled_call_counter = 0 | |
self.call_order_counter = 0 | |
self.return_order_counter = 0 | |
self.complete_message = [] | |
self.set_default_size(350, -1) | |
self.set_resizable(False) | |
self.set_border_width(10) | |
self.headerbar = Gtk.HeaderBar() | |
self.headerbar.set_show_close_button(True) | |
self.headerbar.set_title('Queue Test') | |
self.headerbar.set_subtitle('00:00:00') | |
self.set_titlebar(self.headerbar) | |
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10) | |
self.add(vbox) | |
self.button = Gtk.Button.new_with_label('Get Queue Test Results') | |
self.button.connect('clicked', self.async_queue_test) | |
vbox.pack_start(self.button, False, False, 0) | |
self.label = Gtk.Label() | |
vbox.pack_start(self.label, True, True, 0) | |
GLib.timeout_add(1000, self.update_time) | |
def async_queue_test(self, *ignore): | |
self.button.set_sensitive(False) | |
self.should_have_took = 0.0 | |
self.canceled_call_counter = 0 | |
self.call_order_counter = 0 | |
self.return_order_counter = 0 | |
self.complete_message = [] | |
self.label.set_label('') | |
self.complete_message = [] | |
self.start_time = None | |
def get_message(result): | |
self.return_order_counter += 1 | |
priority, order_called, sleep_time = result.release() | |
if self.return_order_counter > (self.call_order_counter - self.canceled_call_counter): | |
return | |
self.should_have_took += sleep_time | |
message = '{} priority: Call order {}, return order {}, slept for {} secs.'.format(priority, order_called, self.return_order_counter, sleep_time) | |
self.complete_message.append(message) | |
if self.return_order_counter == (self.call_order_counter - self.canceled_call_counter): | |
time_took = time.time() - self.start_time | |
done_message = '\nQueued {} random priority and duration tasks.\nRandomly cancelled {} task(s).'.format(self.call_order_counter, self.canceled_call_counter) | |
time_message = 'Should have taken {} secs.\nActually took {} secs.'.format(self.should_have_took, time_took) | |
overhead_message = '{} secs lantancy.'.format(time_took - self.should_have_took) | |
self.complete_message.append(done_message) | |
self.complete_message.append(time_message) | |
self.complete_message.append(overhead_message) | |
self.button.set_sensitive(True) | |
label_text = '\n'.join(self.complete_message) | |
self.label.set_label(label_text) | |
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_HIGH) | |
def say_high(order_called, sleep_time): | |
time.sleep(sleep_time) | |
return 'High', order_called, sleep_time | |
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_DEFAULT) | |
def say_default(order_called, sleep_time): | |
time.sleep(sleep_time) | |
return 'Default', order_called, sleep_time | |
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_DEFAULT_IDLE) | |
def say_idle(order_called, sleep_time): | |
time.sleep(sleep_time) | |
return 'Idle', order_called, sleep_time | |
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_LOW) | |
def say_low(order_called, sleep_time): | |
time.sleep(sleep_time) | |
return 'Low', order_called, sleep_time | |
called = [] | |
random_number_of_calls = random.randint(2, 24) | |
random_number_cancels = random.randint(1, random_number_of_calls -1) | |
self.start_time = time.time() | |
for call in (random.choice((say_high, say_default, say_idle, say_low)) for i in range(random_number_of_calls)): | |
self.call_order_counter += 1 | |
test_call = call(self.call_order_counter, random.uniform(0.1, 1.0)) | |
called.append(test_call) | |
while True: | |
random_call = random.choice(called) | |
if not random_call.is_cancelled(): | |
if self.canceled_call_counter < random_number_cancels: | |
random_call.cancel() | |
self.canceled_call_counter += 1 | |
else: | |
break | |
def update_time(self): | |
self.time_sec += 1 | |
time_int = self.time_sec | |
s = time_int % 60 | |
time_int //= 60 | |
m = time_int % 60 | |
time_int //= 60 | |
h = time_int | |
self.headerbar.set_subtitle('{:02d}:{:02d}:{:02d}'.format(h, m, s)) | |
return True | |
if __name__ == '__main__': | |
win = QueueTest() | |
win.connect('delete-event', Gtk.main_quit) | |
win.show_all() | |
Gtk.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment