Created
April 9, 2018 10:57
-
-
Save Irfy/7e47d49c9a71b529a334cb0ba49a2471 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
import ctypes | |
from time import sleep | |
from threading import Thread | |
def _async_raise(tid, exctype): | |
"""raises the exception, performs cleanup if needed""" | |
if not inspect.isclass(exctype): | |
exctype = type(exctype) | |
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) | |
if res == 0: | |
raise ValueError("invalid thread id") | |
elif res != 1: | |
# """if it returns a number greater than one, you're in trouble, | |
# and you should call it again with exc=NULL to revert the effect""" | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
def stop_thread(thread): | |
_async_raise(thread.ident, SystemExit) | |
class myReader(Thread): | |
def run(self): | |
with open('/dev/tty', encoding='ascii') as myStream: | |
try: | |
for myString in myStream: | |
print(myString) | |
except SystemExit: | |
print("SystemExit detected") | |
print("cleaning up...") | |
myReader = myReader() | |
myReader.start() | |
while(True): | |
try: | |
sleep(0.1) # Time in seconds. | |
except KeyboardInterrupt: | |
print("KeyboardInterrupt detected, signaling thread") | |
break | |
stop_thread(myReader) | |
print("Waiting on thread join...") | |
myReader.join() | |
print("Joined.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment