Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python ThreadPoolExecutor (non-)graceful shutdown
#!/usr/bin/env python3
import concurrent.futures.thread
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def remove_file(path):
print('Removing file %s' % path)
time.sleep(10) # Pretending that I'm removing the file...
print('%s is removed' % path)
not_graceful = sys.argv[1:] and sys.argv[1] == '--not-graceful'
if not_graceful:
print('I will _not_ be shut down gracefully...')
else:
print('I will be shut down gracefully... (default behavior)')
with ThreadPoolExecutor(1) as executor:
futures = [executor.submit(remove_file, path) for path in 'abcd']
try:
for future in as_completed(futures):
future.result()
except KeyboardInterrupt:
if not_graceful:
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
raise
@clchiou

This comment has been minimized.

Copy link
Owner Author

@clchiou clchiou commented Mar 19, 2015

A graceful shutdown when you hitting Ctrl-C will go on removing more files.

$ python3 non_graceful_shutdown.py 
I will be shut down gracefully... (default behavior)
Removing file a
^Ca is removed
Removing file b
b is removed
Removing file c
c is removed
Removing file d
d is removed
Traceback (most recent call last):
  File "non_graceful_shutdown.py", line 25, in <module>
    for future in as_completed(futures):
  File "/usr/lib/python3.4/concurrent/futures/_base.py", line 215, in as_completed
    waiter.event.wait(wait_timeout)
  File "/usr/lib/python3.4/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.4/threading.py", line 289, in wait
    waiter.acquire()
KeyboardInterrupt

A non-graceful shutdown won't.

$ python3 non_graceful_shutdown.py --not-graceful
I will _not_ be shut down gracefully...
Removing file a
^CTraceback (most recent call last):
  File "non_graceful_shutdown.py", line 25, in <module>
    for future in as_completed(futures):
  File "/usr/lib/python3.4/concurrent/futures/_base.py", line 215, in as_completed
    waiter.event.wait(wait_timeout)
  File "/usr/lib/python3.4/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.4/threading.py", line 289, in wait
    waiter.acquire()
KeyboardInterrupt
@gaborbernat

This comment has been minimized.

Copy link

@gaborbernat gaborbernat commented Feb 25, 2016

wouldn't an executor.shutdown(wait=False) do the same?

@megapctr

This comment has been minimized.

Copy link

@megapctr megapctr commented Oct 14, 2016

@gaborbernat no, because shutdown(False) only cancels its pending jobs. It still waits for its currently running jobs to finish.

@nk9

This comment has been minimized.

Copy link

@nk9 nk9 commented Apr 20, 2020

Works like a charm. Thanks!

@clchiou

This comment has been minimized.

Copy link
Owner Author

@clchiou clchiou commented Apr 20, 2020

Thanks.

A very late reply to @gaborbernat: The thread executor module joins the executor threads at two levels. One is in the shutdown function. I disabled it with executor._threads.clear(). Although shutdown(wait=False) has the same effect, since executor.__exit__ waits unconditionally, the process will still be blocked at the end of the with block. That's why I used the executor._threads.clear(). But if you are not using executor's context manager, shutdown(wait=False) should just work.

The second join point is the atexit callback. I think it can only be disabled with concurrent.futures.thread._threads_queues.clear(). (Note that this disables joining of threads of all executors.)

A side note: I haven't dug into the revision history, but it is interesting that the executor module tries very hard to join the threads, and yet it sets all threads to be daemons.

@alex-ber

This comment has been minimized.

Copy link

@alex-ber alex-ber commented Apr 30, 2020

@clchiou, thanks for you late reply :-)

@sergshner

This comment has been minimized.

Copy link

@sergshner sergshner commented Sep 18, 2020

In Python >= 3.7 the following part will return an error:

    except KeyboardInterrupt:

        if not_graceful:
            executor._threads.clear()
            concurrent.futures.thread._threads_queues.clear()
        raise

For example, in my script

Traceback (most recent call last):
  File "mssql_backup_threaded.py", line 196, in <module>
    concurrent.futures.thread._threads_queues.clear()
  File "C:\Users\user\AppData\Local\Programs\Python\Python38-32\lib\concurrent\futures\__init__.py", line 53, in __getattr__
    raise AttributeError(f"module {__name__} has no attribute {name}")

AttributeError: module concurrent.futures has no attribute thread

In order to workaround this issue you can add the following line to the importing part of your program:

from concurrent.futures import thread
@White2001Offl

This comment has been minimized.

Copy link

@White2001Offl White2001Offl commented Apr 3, 2021

Thanks for sharing this. Works perfectly as I wanted. ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment