Skip to content

Instantly share code, notes, and snippets.

@JasonLG1979
Last active August 28, 2016 23:51
Show Gist options
  • Save JasonLG1979/8b90a75919ce3b5fb63788b62dae0845 to your computer and use it in GitHub Desktop.
Save JasonLG1979/8b90a75919ce3b5fb63788b62dae0845 to your computer and use it in GitHub Desktop.
Working GLib_async_queue example
#
# Copyright (C) 2016 Jason Gray <jasonlevigray3@gmail.com>
#
#This program is free software: you can redistribute it and/or modify it
#under the terms of the GNU General Public License version 3, as published
#by the Free Software Foundation.
#
#This program is distributed in the hope that it will be useful, but
#WITHOUT ANY WARRANTY; without even the implied warranties of
#MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
#PURPOSE. See the GNU General Public License for more details.
#
#You should have received a copy of the GNU General Public License along
#with this program. If not, see <http://www.gnu.org/licenses/>.
### END LICENSE
import threading
import queue
import time
import random
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GLib, Gio
class Result:
def __init__(self, condition, result):
self._result = result
self._condition = condition
def get(self):
return self._result
def release(self):
with self._condition:
self._condition.notify()
return self._result
class Worker(threading.Thread):
def __init__(self):
super().__init__()
self.queue = queue.PriorityQueue()
self.fifo_priority = 0
self.daemon = True
self.start()
def run(self):
while True:
priority, _, f, args, kwargs, on_done, cancellable = self.queue.get()
if not cancellable.is_cancelled():
result = f(*args, **kwargs)
if on_done:
condition = threading.Condition()
GLib.idle_add(on_done, Result(condition, result), priority=priority)
with condition:
condition.wait()
self.queue.task_done()
def queue_task(self, priority, f, args, kwargs, on_done, cancellable):
self.fifo_priority += 1
self.queue.put_nowait((priority, self.fifo_priority, f, args, kwargs, on_done, cancellable))
worker = Worker()
def GLib_async_queue(on_done=None, priority=GLib.PRIORITY_DEFAULT_IDLE):
def wrapper(f):
def run(*args, **kwargs):
cancellable = Gio.Cancellable()
worker.queue_task(priority, f, args, kwargs, on_done, cancellable)
return cancellable
return run
return wrapper
class QueueTest(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self, title='Queue Test')
self.time_sec = 0
self.start_time = None
self.should_have_took = 0.0
self.canceled_call_counter = 0
self.call_order_counter = 0
self.return_order_counter = 0
self.complete_message = []
self.set_default_size(350, -1)
self.set_resizable(False)
self.set_border_width(10)
self.headerbar = Gtk.HeaderBar()
self.headerbar.set_show_close_button(True)
self.headerbar.set_title('Queue Test')
self.headerbar.set_subtitle('00:00:00')
self.set_titlebar(self.headerbar)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
self.add(vbox)
self.button = Gtk.Button.new_with_label('Get Queue Test Results')
self.button.connect('clicked', self.async_queue_test)
vbox.pack_start(self.button, False, False, 0)
self.label = Gtk.Label()
vbox.pack_start(self.label, True, True, 0)
GLib.timeout_add(1000, self.update_time)
def async_queue_test(self, *ignore):
self.button.set_sensitive(False)
self.should_have_took = 0.0
self.canceled_call_counter = 0
self.call_order_counter = 0
self.return_order_counter = 0
self.complete_message = []
self.label.set_label('')
self.complete_message = []
self.start_time = None
def get_message(result):
self.return_order_counter += 1
priority, order_called, sleep_time = result.release()
if self.return_order_counter > (self.call_order_counter - self.canceled_call_counter):
return
self.should_have_took += sleep_time
message = '{} priority: Call order {}, return order {}, slept for {} secs.'.format(priority, order_called, self.return_order_counter, sleep_time)
self.complete_message.append(message)
if self.return_order_counter == (self.call_order_counter - self.canceled_call_counter):
time_took = time.time() - self.start_time
done_message = '\nQueued {} random priority and duration tasks.\nRandomly cancelled {} task(s).'.format(self.call_order_counter, self.canceled_call_counter)
time_message = 'Should have taken {} secs.\nActually took {} secs.'.format(self.should_have_took, time_took)
overhead_message = '{} secs lantancy.'.format(time_took - self.should_have_took)
self.complete_message.append(done_message)
self.complete_message.append(time_message)
self.complete_message.append(overhead_message)
self.button.set_sensitive(True)
label_text = '\n'.join(self.complete_message)
self.label.set_label(label_text)
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_HIGH)
def say_high(order_called, sleep_time):
time.sleep(sleep_time)
return 'High', order_called, sleep_time
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_DEFAULT)
def say_default(order_called, sleep_time):
time.sleep(sleep_time)
return 'Default', order_called, sleep_time
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_DEFAULT_IDLE)
def say_idle(order_called, sleep_time):
time.sleep(sleep_time)
return 'Idle', order_called, sleep_time
@GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_LOW)
def say_low(order_called, sleep_time):
time.sleep(sleep_time)
return 'Low', order_called, sleep_time
called = []
random_number_of_calls = random.randint(2, 24)
random_number_cancels = random.randint(1, random_number_of_calls -1)
self.start_time = time.time()
for call in (random.choice((say_high, say_default, say_idle, say_low)) for i in range(random_number_of_calls)):
self.call_order_counter += 1
test_call = call(self.call_order_counter, random.uniform(0.1, 1.0))
called.append(test_call)
while True:
random_call = random.choice(called)
if not random_call.is_cancelled():
if self.canceled_call_counter < random_number_cancels:
random_call.cancel()
self.canceled_call_counter += 1
else:
break
def update_time(self):
self.time_sec += 1
time_int = self.time_sec
s = time_int % 60
time_int //= 60
m = time_int % 60
time_int //= 60
h = time_int
self.headerbar.set_subtitle('{:02d}:{:02d}:{:02d}'.format(h, m, s))
return True
if __name__ == '__main__':
win = QueueTest()
win.connect('delete-event', Gtk.main_quit)
win.show_all()
Gtk.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment