Skip to content

Instantly share code, notes, and snippets.

@callmewhy
Forked from cliffxuan/defer.py
Last active May 27, 2016 06:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save callmewhy/4311c6d054dfa729454fe7858ffee993 to your computer and use it in GitHub Desktop.
Save callmewhy/4311c6d054dfa729454fe7858ffee993 to your computer and use it in GitHub Desktop.
wxPython non blocking gui with long running task and timeout
import threading
import Queue
import wx
def run(function,
args=None,
timeout=None,
finish_function=None,
timeout_function=None,
exception_function=None, ):
if args is None:
args = ()
result_queue = Queue.Queue()
worker_thread = WorkerThread(function, args, result_queue)
monitor_thread = MonitorThread(result_queue,
timeout,
finish_function,
timeout_function,
exception_function)
worker_thread.start()
monitor_thread.start()
class WorkerThread(threading.Thread):
def __init__(self, function, args, result_queue):
threading.Thread.__init__(self)
self.function = function
self.result_queue = result_queue
self.args = args
def run(self):
try:
result = self.function(*self.args)
except Exception as e:
self.result_queue.put((False, e))
else:
self.result_queue.put((True, result))
class MonitorThread(threading.Thread):
def __init__(self,
result_queue,
timeout,
finish_function,
timeout_function,
exception_function, ):
threading.Thread.__init__(self)
self.result_queue = result_queue
self.timeout = timeout
self.finish_function = finish_function
self.timeout_function = timeout_function
self.exception_function = exception_function
def run(self):
try:
(finished, result) = self.result_queue.get(True, self.timeout)
if finished:
if self.finish_function is not None:
wx.CallAfter(self.finish_function, result)
else:
if self.exception_function:
wx.CallAfter(self.exception_function, result)
except Queue.Empty:
if self.timeout_function is not None:
wx.CallAfter(self.timeout_function)
def test():
import time
import random
timeout = random.choice(xrange(1, 5))
sleep = random.choice(xrange(1, 5))
def slow_function(wait):
time.sleep(wait)
if wait % 2 == 0:
raise Exception('ERROR ERROR ERROR')
return 'finished'
def result_callback(result):
label.SetLabel(result)
def timeout_callback():
label.SetLabel('timeout')
def exception_callback(e):
print e
label.SetLabel('exception')
app = wx.App()
frame = wx.Frame(None, -1, "defer test with random time")
panel = wx.Panel(frame, -1)
panel_sizer = wx.BoxSizer(wx.VERTICAL)
panel.SetSizer(panel_sizer)
label = wx.StaticText(panel, -1)
label.SetLabel('%ss to run | %ss to timeout' % (sleep, timeout))
panel_sizer.Add(label)
frame.Show(True)
run(slow_function, (sleep,),
timeout=timeout,
finish_function=result_callback,
timeout_function=timeout_callback,
exception_function=exception_callback, )
app.MainLoop()
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment