Skip to content

Instantly share code, notes, and snippets.

@yeraydiazdiaz
Last active February 6, 2021 04:59
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yeraydiazdiaz/b8c059c6dcfaf3255c65806de39175a7 to your computer and use it in GitHub Desktop.
Save yeraydiazdiaz/b8c059c6dcfaf3255c65806de39175a7 to your computer and use it in GitHub Desktop.
Cancellation on run_in_executor using ThreadPoolExecutor
"""
A demonstration on how raising KeyboardInterrupt in the context of tasks
spawned via asyncio's loop.run_in_executor does not cancel the threads
using any of the cancellation methods in asyncio Futures.
The only "proper" way to cancel is to:
1. unregister the `atexit` registered `_python_exit` function
2. call `shutdown(wait=False)`
The reason is that the `thread` module registers `_python_exit` forcing a
join of the remaining worker threads (https://github.com/python/cpython/blob/master/Lib/concurrent/futures/thread.py#L33-L42)
This alone is not enough as `shutdown` will also join the threads by default
so we need to pass `wait=False` to the call as well.
"""
import asyncio
import atexit
import time
import concurrent.futures
def sleepy(n=10):
try:
print('Sleepy')
time.sleep(n)
print('Done sleeping')
return n
except Exception as e:
# not triggered by cancellation
print('Got an exception ', e)
raise
def main(executor):
try:
print(f'Running sleepy in executor {executor}')
futures = [loop.run_in_executor(executor, sleepy) for _ in range(10)]
return futures
except Exception as e:
# not triggered by cancellation
print('Got an exception ', e)
raise
loop = asyncio.get_event_loop()
atexit.unregister(concurrent.futures.thread._python_exit)
try:
executor = concurrent.futures.ThreadPoolExecutor()
futures = main(executor)
future = asyncio.gather(*futures)
result = loop.run_until_complete(future)
print('Result: ', result)
except KeyboardInterrupt:
print('Main thread')
# cancelling the gather future marks them as cancelled but doesn't stop the threads
future.cancel()
for fut in futures:
if not fut.done():
# this will not be executed as cancelling the parent gather cancels child futures
print('Cancelling: ', fut)
fut.cancel()
# shutdown is design to stop new work coming in, but not to stop already existing work
# https://github.com/python/cpython/blob/master/Lib/concurrent/futures/thread.py#L198-L204
executor.shutdown(wait=False)
finally:
# when threads return the loop is already closed and a lot of
# `RuntimeError: Event loop is closed` are raised
loop.close()
@sergshner
Copy link

In Python >= 3.7 the following part will return an error:

atexit.unregister(concurrent.futures.thread._python_exit)
Traceback (most recent call last):
  File "mssql_backup_threaded.py", line 197, in <module>
    atexit.unregister(concurrent.futures.thread._python_exit)
  File "C:\Users\sshnerson\AppData\Local\Programs\Python\Python38-32\lib\concurr
ent\futures\__init__.py", line 53, in __getattr__
    raise AttributeError(f"module {__name__} has no attribute {name}")
AttributeError: module concurrent.futures has no attribute thread

In order to workaround this issue you can add the following line to the importing part of your program:

from concurrent.futures import thread

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment