Skip to content

Instantly share code, notes, and snippets.

@nasingfaund
Forked from dsh0005/tktest.py
Created March 31, 2023 12:39
Show Gist options
  • Save nasingfaund/9ac047dd6e9fad7391b80d608fcfa7cf to your computer and use it in GitHub Desktop.
Save nasingfaund/9ac047dd6e9fad7391b80d608fcfa7cf to your computer and use it in GitHub Desktop.
Tkinter background threading example
#!/usr/bin/env python3
# encoding=utf-8
# vim: set nobomb:
from tkinter import *
from threading import Thread, Lock, Event
from queue import SimpleQueue
from time import sleep
from typing import List
import random
class WorkerThread(Thread):
def __init__(self, interface: 'HMI', callback, idx: int):
super().__init__()
self.interface = interface
self.callback = callback
self.idx = idx
def run(self):
count = 0
while not self.interface.exit_event.is_set():
count += 1
self.callback(self.idx, count)
sleep(random.random() / 1000)
class HMI:
def __init__(self):
self._exit_event = Event()
self._event_locks = [Lock() for i in range(2)]
self.master = Tk()
self.master.geometry('200x200+1+1')
f = Frame(self.master)
f.pack()
self.l0 = Label(f)
self.l0.pack()
self.l1 = Label(f)
self.l1.pack()
self.q0 = SimpleQueue()
self.q1 = SimpleQueue()
self.master.bind("<<Thread_0_Label_Update>>", self.thread_0_update_e)
self.master.bind("<<Thread_1_Label_Update>>", self.thread_1_update_e)
self.master.protocol("WM_DELETE_WINDOW", self.closing)
@property
def exit_event(self) -> Event:
return self._exit_event
@property
def event_locks(self) -> List[Lock]:
return self._event_locks
def start(self):
self.master.mainloop()
# now that we've shut down, and the workers aren't going to send more messages, this is safe.
for lock in self.event_locks:
lock.release()
def closing(self):
# set this event to tell the workers to exit
self.exit_event.set()
# acquire these locks so we know the workers aren't sending more messages
for lock in self.event_locks:
while not lock.acquire(False):
# someone is waiting on us to pump messages while they hold the lock
self.master.update()
# now that we have the locks, it's safe to start shutting down
self.master.destroy()
#################################
def thread_0_update(self, idx, val):
self.q0.put_nowait(val)
with self.event_locks[idx]:
if not self.exit_event.is_set():
self.master.event_generate('<<Thread_0_Label_Update>>', when='tail')
def thread_1_update(self, idx, val):
self.q1.put_nowait(val)
with self.event_locks[idx]:
if not self.exit_event.is_set():
self.master.event_generate('<<Thread_1_Label_Update>>', when='tail')
def thread_0_update_e(self, e):
while not self.q0.empty():
try:
val = self.q0.get(False)
self.l0.config(text=str(val))
except queue.Empty:
pass
def thread_1_update_e(self, e):
while not self.q1.empty():
try:
val = self.q1.get(False)
self.l1.config(text=str(val))
except queue.Empty:
pass
##########################
def main():
hmi = HMI()
t0 = WorkerThread(hmi, hmi.thread_0_update, 0)
t1 = WorkerThread(hmi, hmi.thread_1_update, 1)
t0.start()
t1.start()
hmi.start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment