Skip to content

Instantly share code, notes, and snippets.

@npodonnell
Last active December 29, 2020 22:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save npodonnell/ad1621ebd6c742d36eaadaad6a500814 to your computer and use it in GitHub Desktop.
Save npodonnell/ad1621ebd6c742d36eaadaad6a500814 to your computer and use it in GitHub Desktop.
Python Worker
#!/usr/bin/env python3
# Simple worker which can run a function in a different thread, and
# catch exceptions so they can be handled in the calling thread.
#
# N. P. O'Donnell, 2020
from threading import Thread
from typing import Callable
import random
import time
EXCEPTION_PROBABILITY = 0.05
def do_work():
for i in range(10):
print(i)
if random.random() < EXCEPTION_PROBABILITY:
raise Exception("Random exception")
time.sleep(1)
return 3.14159
class Worker(Thread):
"""
Worker with exception handling functionality.
Exceptions raised in the worker thread can be handled in the calling thread in 2 alternative ways:
1. Providing a callback to the constructor (exc_callback) which is called as soon as an exception is raised.
2. By wrapping a call to join() in a try-except and handling the exception in the except clause.
"""
def __init__(self, target=None, args=(), kwargs=None, exc_callback: Callable[[BaseException], None] = None):
"""
Constructor.
:param target: Function to run in new thread.
:param args: Arguments.
:param kwargs: Keyword Arguments.
"""
super().__init__(target=target, args=args, kwargs=kwargs)
self._exc_callback = exc_callback
self._exception = None
def run(self):
"""Thread run function."""
try:
self.return_value = self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._exc_callback:
self._exc_callback(e)
else:
self._exception = e
def join(self, timeout_s: float = None):
"""
Wait for target to stop running and check if it threw an exception.
If an exception is found, raise it so the calling thread is alerted.
:param timeout_s: Number of seconds to wait before continuing.
"""
super().join(timeout=timeout_s)
if self._exception:
e = self._exception
self._exception = None
raise e
def handle_exc(e: Exception):
"""
Handle exception
:param e: Exception caught.
"""
print(f"Caught with callback: {e}")
def main():
# Worker 1: Exception handled with try-except block.
w1 = Worker(do_work)
# Worker 2: Exception handled with callback.
w2 = Worker(do_work, exc_callback=handle_exc)
# Start workers.
w1.start()
w2.start()
try:
w1.join()
print(f"w1 returned: {w1.return_value}")
except Exception as e:
print(f"Caught with join: {e}")
w2.join()
if hasattr(w2, "return_value"):
print(f"w2 returned: {w2.return_value}")
print("Exiting")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment