Skip to content

Instantly share code, notes, and snippets.

@thodnev
Created August 25, 2016 22:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thodnev/d2c70d1034d1b8547fa8edf1c297b859 to your computer and use it in GitHub Desktop.
Save thodnev/d2c70d1034d1b8547fa8edf1c297b859 to your computer and use it in GitHub Desktop.
'''
Useful threaded GUI routines
'''
# Author: thodnev
# TODO: add docstrings to methods
import queue
import threading
class ThreadedGuiBaseError(Exception): # for forward compat. or as category
pass
class ThreadDispatcherError(ThreadedGuiBaseError):
pass
class ThreadDispatcher:
'''
Schedules widget to periodically check for and run callbacks, dispatched
by worker threads in main (GUI) thread using shared queue.
Queue items have form of: (callable, args, kwargs)
`is_running` instance attribute represents status. Setting it to False
allows to stop running dispatcher (or could use `stop()` method). Setting
it to True has no effect (use `start()` method instead).
To stop futher dispatching, pass instance's `stop` method to thread and
place it on queue inside thread.
'''
def __init__(self, widget, queue, interval_ms, max_count=1):
'''
Initialize a dispatcher.
`widget` is an instance of tkinter widget (with `after` method)
`queue` is an instance of `Queue` shared by main and worker threads
`interval_ms` defines how often to check queue for callbacks
`max_count` defines how much callbacks to dispatch at time
'''
self.widget = widget
self.queue = queue
self.interval = interval_ms
self.max_count = max_count
self.is_running = False
self._finalizing = False
def start(self, force=False):
'''
Used to start checking queue for dispatched callbacks.
When already running and `force` is False raise `ThreadDispatcherError`
'''
if force or not self.is_running:
self.is_running = True
self._gui_callback() # _gui_callback reschedules itself
else:
raise ThreadDispatcherError(
"Couldn't start already running dispatcher") from None
# TODO??: maybe we need some politics when called on already finalizing
def stop(self, force=False):
'''
Used to stop checking queue.
`force` controls how to stop dispatching, when `False` -- waits for
queue to become empty, when `True` -- stops immediately.
When already stopped and `force` is False raise `ThreadDispatcherError`
'''
if force:
self.is_running = False
elif self.is_running:
self._finalizing = True
else:
raise ThreadDispatcherError(
"Couldn't stop non-running dispatcher") from None
def _gui_callback(self):
'''
The callback actually scheduled to be run by tkintes's `after`
'''
if self.is_running:
for _ in range(self.max_count):
try:
item = self.queue.get_nowait()
except queue.Empty: # dont dispatch others if empty
if self._finalizing: # when finalizing, stop @ empty queue
self.is_running = False
self._finalizing = False
break
else:
self._process_item(item)
# schedule next get & run
self.widget.after(self.interval, self._gui_callback)
def _process_item(self, item):
'''
Used to process items from queue.
Customizable via subclassing
'''
callback, args, kwargs = item
callback(*args, **kwargs)
# Not needed: (dispatch update from threads)
# self.widget.update_idletasks() # maybe needed here for long tasks
# TODO:
# add logic to automatically create & start dispatcher for *group* of threads
#
# do we really need to call "throw" method "throw"?
class GuiThread(threading.Thread):
threads_alive = {} # to stop dispatcher after all threads exit
# format {dispatcher: set(<threads>)}
threads_alive_lock = threading.Lock() # to synchronize access
# class defaults. If not set on init, these are used
# could be redefined during subclassing
dispatcher = None # None means "act as Thread"
queue = None
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None, dispatcher=None):
if dispatcher is not None: # else finds all this in class
self.dispatcher = dispatcher
self.queue = dispatcher.queue
super().__init__(group=group, target=target, name=name,
args=args, kwargs=kwargs, daemon=daemon)
def start(self):
# dispatching start logic below
with self.threads_alive_lock:
curset = self.threads_alive.setdefault(self.dispatcher, set())
if not curset and self.dispatcher is not None:
self.dispatcher.start() # if its used and not started yet
curset.add(self)
# refer to std inherited method logic
super().start()
def run(self):
super().run()
# dispatching stop logic below
with self.threads_alive_lock:
self.threads_alive[self.dispatcher].remove(self) # remove from set
if not self.threads_alive[self.dispatcher]: # if empty set del key
del self.threads_alive[self.dispatcher] # to prevent growth
if self.queue is not None: # and stop dispatching if been used
self.queue.put([self.dispatcher.stop, (False,)])
def throw(self, callback, *args, **kwargs):
self.queue.put([callback, args, kwargs])
# TODO??: do we need separate threads_alive in subclass here??
## Maybe we need to define this as class? like:
### class MakeGuiThreadGroup:
### def __new__(cls, widget, interval_ms, max_count):
def guithread_group(widget, interval_ms, max_count):
# create a queue and dispatcher for widget
que = queue.Queue()
dsp = ThreadDispatcher(widget=widget, queue=que, interval_ms=interval_ms,
max_count=max_count)
# create a subclass of GuiThread, which uses current dispatcher
class GuiThreadGroup(GuiThread):
dispatcher = dsp
queue = que
# return created subclass, used to spawn threads in current group
return GuiThreadGroup
if __name__ == '__main__': # self-test [dirty code below =)]
import time
import tkinter as tk
from itertools import chain
print_lock = threading.Lock()
def lprint(*args, **kwargs):
with print_lock:
print(*args, **kwargs)
def thread_nongui(sleeptime):
time.sleep(sleeptime)
lprint('>>\t{}) Non-gui thread'.format(sleeptime))
nonguis = [GuiThread(target=thread_nongui, args=(time,))
for time in range(3)]
for th in nonguis:
th.start()
with GuiThread.threads_alive_lock:
lprint('...', GuiThread.threads_alive)
lprint('>>Non-guis started')
for th in nonguis:
th.join()
with GuiThread.threads_alive_lock:
lprint('>>', GuiThread.threads_alive)
print('>>Non-guis finished')
print('*'*79+'\nNow start dispatching test')
l = tk.Label(text='<initial>')
que = queue.Queue()
dsp = ThreadDispatcher(l, que, 500)
tk.Button(text='Status', command=lambda: print(
'Dispatcher status now:', dsp.is_running, g_dsp.is_running)).pack()
l.pack()
text = 'First line\nsecond line\nand third\nall\nare great'
def thread_gui(id, widget, text, queue):
time.sleep(id)
queue.put([lambda txt: widget.configure(text=widget['text']+'\n'+txt),
(text,)])
guis = [GuiThread(target=thread_gui, args=(num, l, t, que), dispatcher=dsp)
for num, t in enumerate(text.splitlines(), 1)]
GuiGroup = guithread_group(l, 500, 1)
group = [GuiGroup(target=thread_gui, args=(num, l, t, GuiGroup.queue))
for num, t in enumerate(text.splitlines(), 4)]
g_dsp = GuiGroup.dispatcher
print('Dispatcher status before start:', dsp.is_running,
'Group:', g_dsp.is_running)
print('Starting group...')
for th in group:
th.start()
print('Dispatcher status after group:', dsp.is_running,
'Group:', g_dsp.is_running)
print('Starting threads...')
for th in guis:
th.start()
print('...', GuiThread.threads_alive)
print('Dispatcher status now:', dsp.is_running, 'Group:', g_dsp.is_running)
print('Waiting for threads to finish')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment