Skip to content

Instantly share code, notes, and snippets.

@liuw
Created April 17, 2012 16:02
Show Gist options
  • Star 37 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
  • Save liuw/2407154 to your computer and use it in GitHub Desktop.
Save liuw/2407154 to your computer and use it in GitHub Desktop.
Nasty hack to raise exception for other threads
#!/usr/bin/env python
# liuw
# Nasty hack to raise exception for other threads
import ctypes # Calm down, this has become standard library since 2.5
import threading
import time
NULL = 0
def ctype_async_raise(thread_obj, exception):
found = False
target_tid = 0
for tid, tobj in threading._active.items():
if tobj is thread_obj:
found = True
target_tid = tid
break
if not found:
raise ValueError("Invalid thread object")
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, ctypes.py_object(exception))
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
# Huh? Why would we notify more than one threads?
# Because we punch a hole into C level interpreter.
# So it is better to clean up the mess.
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL)
raise SystemError("PyThreadState_SetAsyncExc failed")
print "Successfully set asynchronized exception for", target_tid
def f():
try:
while True:
time.sleep(1)
finally:
print "Exited"
t = threading.Thread(target=f)
t.start()
print "Thread started"
print t.isAlive()
time.sleep(5)
ctype_async_raise(t, SystemExit)
t.join()
print t.isAlive()
@glenfant
Copy link

glenfant commented Feb 9, 2014

Hi,

According to http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc the target_tid parameter is expected to be a C long int. As a consequence the formula in line 23 does not work as expected on some CPython engines including mine (CPython 2.6, 2.7 and 3.3 on MacOSX Mavericks) : ret is always 0.

So I changed your line 23 with this one and it works for me as it should work on any CPython engine:

    ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid),
                                                     ctypes.py_object(exception))

@stuaxo
Copy link

stuaxo commented Mar 1, 2014

The cast to long is definitely a 64 bit thing, it fixes Ubuntu on 64 bit too.

@jhcook
Copy link

jhcook commented Aug 25, 2014

@glenfant: This worked and saved me loads of time on Mavericks! Ta

@andresriancho
Copy link

@jhcook , @stuaxo , @liuw anyone else having issues with locks and PyThreadState_SetAsyncExc? glenfant/stopit#6

@rosensama
Copy link

Can't you use thread_obj.ident instead of looping through threading._active? Worried about id reuse?

@nazavode
Copy link

nazavode commented Dec 1, 2015

Forked and ported to Python 3.4. Works pretty well even if remains one of the dirtiest hacks ever seen.

@bfrggit
Copy link

bfrggit commented Aug 5, 2016

This will not work if the thread to kill is blocked at some I/O or sleep(very_long_time)

@thanda43X
Copy link

This helped me more than you can imagine. Kudos

@csaftoiu
Copy link

This doesn't seem to work if the thread in question is doing a time.sleep... anyone else notice this?

@wyl8899
Copy link

wyl8899 commented Mar 25, 2020

This doesn't seem to work if the thread in question is doing a time.sleep... anyone else notice this?

I believe this is because the exception is "async" since the API is _SetAsyncExc.

In my use case, I call a subprocess in my target thread, and I have to change p.wait() to a loop like

while time.time() < deadline:
    try:
        p.wait(0.1)
    except subprocess.TimeoutExpired:
        pass

to make the thread responsive to the exception.

Works pretty well so far, thank you so much.

@earonesty
Copy link

note: this won't interrupt sockets/sleeps. for that you still need to send a signal.

@klamann
Copy link

klamann commented Dec 6, 2021

This got a lot easier in Python 3.8 or higher, since Thread.native_id was introduced:

import ctypes
from threading import Thread
from typing import Type

def raise_exception_in_thread(t: Thread, e: Type[BaseException]):
    ctypes.pythonapi.PyThreadState_SetAsyncExc(t.native_id, ctypes.py_object(e))

edit: I tested this on Windows, but t.native_id does not seem to work for me on Ubuntu. Gotta use t.ident instead! This has the additional advantage, that it works on Python 3.3+

import ctypes
from threading import Thread
from typing import Type

def raise_exception_in_thread(t: Thread, e: Type[BaseException]):
    ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(t.ident), ctypes.py_object(e))

@Pithikos
Copy link

Pithikos commented Dec 13, 2021

I couldn't figure out an easy way to use exceptions with custom messages so below is a workaround for non-builtin exceptions.

def raise_exception_in_thread(thread_obj, exception_cls, message=None):

    # Monkey-patch exception with a default message since PyThreadState_SetAsyncExc can only take a class
    if message:
        def __init__(self, *args, **kwargs):
            nonlocal message
            super(exception_cls, self).__init__(message, **kwargs)
        exception_cls.__init__ = __init__

    found = False
    target_tid = 0
    for tid, tobj in threading._active.items():
        if tobj is thread_obj:
            found = True
            target_tid = tid
            break

    if not found:
        raise ValueError("Invalid thread object")

    ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid), ctypes.py_object(exception_cls))
    # ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
    if ret == 0:
        raise ValueError("Invalid thread ID")
    elif ret > 1:
        # Huh? Why would we notify more than one threads?
        # Because we punch a hole into C level interpreter.
        # So it is better to clean up the mess.
        ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL)
        raise SystemError("PyThreadState_SetAsyncExc failed")
    print(f"Successfully set asynchronized exception for {target_tid}")

@axxyaan
Copy link

axxyaan commented Jul 31, 2022

How about the following approach. If you would like to raise ValueError("Error Msg") You create a class with ExceptInstance. This will create a superclass of ValueError where the arg is passed in by init. This should work in most circumstances.

def ExceptInstance(except_class, *args, **kwds):
	class Exceptor(except_class):
		def __init__(self):
			super().__init__(*args, **kwds)

	return Exceptor

def program():
	try:
		thrower()
	except ValueError as err:
		print('ValueError', str(err))

def thrower():
	Exceptor = ExceptInstance(ValueError, 'Threw a value_error')
	raise Exceptor

program()

@CristiFati
Copy link

CristiFati commented Aug 19, 2022

@dstathis
Copy link

@liuw Do you give permission to use this code as is?

@liuw
Copy link
Author

liuw commented Apr 11, 2023

@dstathis Yes. Do whatever you like with the code.

@bmusq
Copy link

bmusq commented Apr 11, 2024

@liuw May I add a small contribution ?

This hack is actually of a great help ! Hereafter a version in a multiprocessing context:

The idea is to have the main process starts an observer thread with pipe to a third independant process and then proceeding with non blocking operations.
The third process can interrupt our main process whenever it needs by the mean of the pipe and the thread observer.
If the main process should be waiting on sockets or sleep, I strongly advise to implement a timeout loop, which is a good practice what so ever.

import ctypes
from time import sleep
from multiprocessing import Process, Pipe
from threading import Thread, get_ident
from typing import Type

def raise_exception_in_thread(tid: int, e: Type[BaseException]):
    ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(e))
    if ret:
        raise KeyboardInterrupt
    else:
        raise ValueError("Invalid thread ID")

def interrupt_process(send_conn, tid):
    print("Starting process interrupter")
    sleep(2)
    send_conn.send(b"kill")
    print("Ending process interrupter")

def observer_thread(recv_conn, tid):
    print("Starting thread observer")
    try:
        while True:
            if recv_conn.poll(timeout=0.5):
                raise_exception_in_thread(tid, KeyboardInterrupt)
    except KeyboardInterrupt:
        print("Ending thread observer")

def main():
    recv_conn, send_conn = Pipe()
    tid = get_ident()
    p = Process(target=interrupt_process, args=(send_conn, tid))
    p.start()

    t = Thread(target=observer_thread, args=(recv_conn, tid))
    t.start()

    try:
        while True:
            sleep(1)
    except KeyboardInterrupt:
        print("Main interruption")
    finally:
        p.join()
        t.join()

if __name__ == "__main__":
    main()

PS: I did not sync the printing part.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment