Skip to content

Instantly share code, notes, and snippets.

@J3ronimo
Last active April 20, 2018 10:10
Show Gist options
  • Save J3ronimo/b4f9b48b5dec7ea1227f167d22067494 to your computer and use it in GitHub Desktop.
Save J3ronimo/b4f9b48b5dec7ea1227f167d22067494 to your computer and use it in GitHub Desktop.
Interruptable threading.Thread, based on https://gist.github.com/liuw/2407154
import threading
import ctypes
import time
class InterruptableThread(threading.Thread):
""" Interruptable threading.Thread, based on:
https://gist.github.com/liuw/2407154
Calling this thread's interrupt() method from the main thread will raise
an Exception inside run(), which will stop the execution.
Blocking I/O or calls to time.sleep are not interruptable this way and will
delay the stopping of the Thread. Use self.sleep as a workaround.
"""
# Internal exception type, raised on self.interrupt(), suppressed in self.run()
class ThreadInterrupt(Exception): pass
def _suppress_threadinterrupts(self, run):
""" Decorator that swallows ThreadInterrupts silently. """
def wrapped_func():
try:
run()
except self.ThreadInterrupt:
pass
finally:
self.finalize()
return wrapped_func
def start(self):
""" Thread.start may only get called once, so we can savely apply the decorator to self.run here. """
self.run = self._suppress_threadinterrupts(self.run)
super().start()
def run(self):
""" This should be implemented in subclass. """
while True:
self.sleep(60)
def sleep(self, seconds, interval=0.1):
""" time.sleep is not interruptable when called from a Thread in Windows.
Therefore, break it up in smaller blocks after which exception handling can happen. """
t_end = time.time() + seconds
while t_end - time.time() > interval:
time.sleep(interval)
time.sleep(max(t_end - time.time(), 0))
def interrupt(self):
""" Inject a ThreadInterrupt into this thread to stop self.run(). """
assert self.ident, "Thread has not been started yet."
tid = ctypes.c_long(self.ident)
exc = ctypes.py_object(self.ThreadInterrupt)
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, exc)
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
# fail => reset the exception to clean up and prevent strange effects
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def finalize(self):
""" Called after thread ends or gets interrupted. """
pass
def test_interruptable_thread():
class TestThread(InterruptableThread):
def run(self):
print("Thread starts")
self.sleep(5)
print("Thread ends regularly")
def finalize(self):
print("Thread is finalized")
def interrupt(self):
print("Thread is interrupted")
super().interrupt()
t = TestThread()
t.start()
time.sleep(1)
t.interrupt() # comment this out for regular thread end
if __name__ == "__main__":
test_interruptable_thread()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment