Last active
April 20, 2018 10:10
-
-
Save J3ronimo/b4f9b48b5dec7ea1227f167d22067494 to your computer and use it in GitHub Desktop.
Interruptable threading.Thread, based on https://gist.github.com/liuw/2407154
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import ctypes | |
import time | |
class InterruptableThread(threading.Thread): | |
""" Interruptable threading.Thread, based on: | |
https://gist.github.com/liuw/2407154 | |
Calling this thread's interrupt() method from the main thread will raise | |
an Exception inside run(), which will stop the execution. | |
Blocking I/O or calls to time.sleep are not interruptable this way and will | |
delay the stopping of the Thread. Use self.sleep as a workaround. | |
""" | |
# Internal exception type, raised on self.interrupt(), suppressed in self.run() | |
class ThreadInterrupt(Exception): pass | |
def _suppress_threadinterrupts(self, run): | |
""" Decorator that swallows ThreadInterrupts silently. """ | |
def wrapped_func(): | |
try: | |
run() | |
except self.ThreadInterrupt: | |
pass | |
finally: | |
self.finalize() | |
return wrapped_func | |
def start(self): | |
""" Thread.start may only get called once, so we can savely apply the decorator to self.run here. """ | |
self.run = self._suppress_threadinterrupts(self.run) | |
super().start() | |
def run(self): | |
""" This should be implemented in subclass. """ | |
while True: | |
self.sleep(60) | |
def sleep(self, seconds, interval=0.1): | |
""" time.sleep is not interruptable when called from a Thread in Windows. | |
Therefore, break it up in smaller blocks after which exception handling can happen. """ | |
t_end = time.time() + seconds | |
while t_end - time.time() > interval: | |
time.sleep(interval) | |
time.sleep(max(t_end - time.time(), 0)) | |
def interrupt(self): | |
""" Inject a ThreadInterrupt into this thread to stop self.run(). """ | |
assert self.ident, "Thread has not been started yet." | |
tid = ctypes.c_long(self.ident) | |
exc = ctypes.py_object(self.ThreadInterrupt) | |
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, exc) | |
if ret == 0: | |
raise ValueError("Invalid thread ID") | |
elif ret > 1: | |
# fail => reset the exception to clean up and prevent strange effects | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
def finalize(self): | |
""" Called after thread ends or gets interrupted. """ | |
pass | |
def test_interruptable_thread(): | |
class TestThread(InterruptableThread): | |
def run(self): | |
print("Thread starts") | |
self.sleep(5) | |
print("Thread ends regularly") | |
def finalize(self): | |
print("Thread is finalized") | |
def interrupt(self): | |
print("Thread is interrupted") | |
super().interrupt() | |
t = TestThread() | |
t.start() | |
time.sleep(1) | |
t.interrupt() # comment this out for regular thread end | |
if __name__ == "__main__": | |
test_interruptable_thread() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment