Skip to content

Instantly share code, notes, and snippets.

@karlp
Created January 20, 2023 13:01
Show Gist options
  • Save karlp/3a277c57feac2c924b851afbf395e4e8 to your computer and use it in GitHub Desktop.
Save karlp/3a277c57feac2c924b851afbf395e4e8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Testing handling exceptions from threads inside asyncio
"""
import asyncio
import logging
import random
import threading
import time
logging.basicConfig(format='%(asctime)s %(thread)x %(threadName)s [%(levelname)s] %(name)s %(message)s', level=logging.DEBUG)
class MyCrashingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.log = logging.getLogger("ThreadCrasher")
def run(self) -> None:
i = 1
while True:
r = random.randrange(0, 1000, 1)
time.sleep(0.5 + r/1000)
self.log.info("not crashed yet: %d", i)
i += 1
if random.randrange(0, 10, 1) > 5:
raise RuntimeError("Explosion in the thread factory")
class DemoApp:
def __init__(self):
self._loop = None
async def task_demo1(self, name, tag):
log = logging.getLogger(name)
i = 1
while True:
log.info("%s: %d", tag, i)
i += 1
r = random.randrange(0, 1000, 1)
await asyncio.sleep(0.5 + r/1000)
async def task_crasher(self):
log = logging.getLogger("Crasher")
i = 1
while True:
r = random.randrange(0, 1000, 1)
await asyncio.sleep(0.5 + r/1000)
log.info("not crashed yet: %d", i)
i += 1
if random.randrange(0, 10, 1) > 5:
raise RuntimeError("Explosion on aisle 3")
async def task_crasher_threaded(self):
"""
An example task that we expect to run forever.
For 'reasons' it uses another thread, and we want to make sure that
if anything happens in _that_ thread, we want to actually abort our task
"""
log = logging.getLogger("CrasherThreaded")
stop_never = asyncio.Event()
def _handle_thread_exceptions(ee):
"""Lol, this gets called in the thread that excepted..."""
# Don't log it twice...
#log.error("unhandled exception in thread! %s", ee)
self._loop.call_soon_threadsafe(stop_never.set)
raise ee.exc_type(ee.exc_value).with_traceback(ee.exc_traceback)
# Attempt to catch unhandled thread exceptions
threading.excepthook = _handle_thread_exceptions
log.info("Creating a thread...")
tt = MyCrashingThread()
tt.start()
log.info("Waiting for ever...")
await stop_never.wait()
raise RuntimeError("Unexpected completion of thread")
async def task_crasher_threaded_alt(self):
"""
An example task that we expect to run forever.
For 'reasons' it uses another thread, and we want to make sure that
if anything happens in _that_ thread, we want to actually abort our task
This version uses .is_alive() instead of excepthooks
"""
log = logging.getLogger("CrasherThreadedAlt")
log.info("Creating a thread...")
tt = MyCrashingThread()
tt.start()
log.info("Waiting for ever...")
while True:
tt.join(0.001)
if not tt.is_alive():
raise RuntimeError("Unexpected completion of thread")
await asyncio.sleep(1)
async def task_crasher_threaded_alt2(self):
"""
An example task that we expect to run forever.
For 'reasons' it uses another thread, and we want to make sure that
if anything happens in _that_ thread, we want to actually abort our task
This version tries to magically create an exception in the outer asyncio thread...
This _DOES NOT WORK_ as the exception, while on the main thread, is somehow...
not within the coroutine? so it's printed, but doesn't cause the .gather to finish
"""
log = logging.getLogger("CrasherThreadedAlt2")
stop_never = asyncio.Event()
def banger(ee):
raise RuntimeError("wheeee")
def _handle_thread_exceptions(ee):
"""Lol, this gets called in the thread that excepted..."""
# Don't log it twice...
#log.error("unhandled exception in thread! %s", ee)
self._loop.call_soon_threadsafe(banger, ee)
# Attempt to catch unhandled thread exceptions
threading.excepthook = _handle_thread_exceptions
log.info("Creating a thread...")
tt = MyCrashingThread()
tt.start()
log.info("Waiting for ever...")
await stop_never.wait()
raise RuntimeError("Unexpected completion of thread")
async def run(self):
self._loop = asyncio.get_running_loop()
L = await asyncio.gather(
self.task_demo1("Demo1", "hoho"),
# The threaded example needs extra handling.
#self.task_crasher_threaded(),
# This one uses thread alive checks...
# self.task_crasher_threaded_alt(),
self.task_crasher_threaded_alt2(),
# An unhandled exception in this raw task just works.
# self.task_crasher(),
self.task_demo1("Demo2", "blah")
)
# We are expecting an exception, and want to break out, or ... have options
# at least, on what we should be doing.
print("This should never be reached: ", L)
if __name__ == "__main__":
app = DemoApp()
asyncio.run(app.run(), debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment