Skip to content

Instantly share code, notes, and snippets.

@Irfy
Created April 9, 2018 10:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Irfy/7e47d49c9a71b529a334cb0ba49a2471 to your computer and use it in GitHub Desktop.
Save Irfy/7e47d49c9a71b529a334cb0ba49a2471 to your computer and use it in GitHub Desktop.
import inspect
import ctypes
from time import sleep
from threading import Thread
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)
class myReader(Thread):
def run(self):
with open('/dev/tty', encoding='ascii') as myStream:
try:
for myString in myStream:
print(myString)
except SystemExit:
print("SystemExit detected")
print("cleaning up...")
myReader = myReader()
myReader.start()
while(True):
try:
sleep(0.1) # Time in seconds.
except KeyboardInterrupt:
print("KeyboardInterrupt detected, signaling thread")
break
stop_thread(myReader)
print("Waiting on thread join...")
myReader.join()
print("Joined.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment