Skip to content

Instantly share code, notes, and snippets.

@Makman2
Last active October 9, 2015 21:53
Show Gist options
  • Save Makman2/88ac3f8ce5bfcc5237a5 to your computer and use it in GitHub Desktop.
Save Makman2/88ac3f8ce5bfcc5237a5 to your computer and use it in GitHub Desktop.
WIP-Prototype for InterruptableThread
# Prototype for an InterruptableProcess.
# 8.10.2015 --> Makman2
import ctypes
import multiprocessing
import threading
def _wait_for_single(*events):
"""
Waits for at least one event being set.
Returns immediately if an event was set before invoking this function.
:param events: The events to wait for.
"""
def set_callback(self):
control_event.set()
self._old_set()
if not any(event.is_set() for event in events):
control_event = multiprocessing.Event()
for event in events:
event._old_set = event.set
event.set = set_callback
control_event.wait()
for event in events:
event.set = event._old_set
delattr(event, "_old_set")
class InterruptableProcess(multiprocessing.Process):
"""
A `multiprocessing.Process` with interrupt capability.
"""
def __init__(self,
group=None,
target=None,
name=None,
args=(),
kwargs={},
daemon=None):
"""
Instantiates a new InterruptableProcess.
See also `threading.Thread` or `multiprocessing.Process` class.
:param group: The process group. This value exists for sole
compatability with the `threading` module and should
always be `None`.
:param target: The target method to run.
:param name: The name of the process. This does not have to be a
unique name. Supplying `None` generates one
automatically.
:param args: Target method invocation arguments.
:param kwargs: Target method key-value invocation arguments.
:param daemon: Specifies if this is a daemon process.
"""
# TODO What's the `*` parameter in python docs for mp.Process?
multiprocessing.Process.__init__(self,
group,
target,
name,
args,
kwargs,
daemon=daemon)
self._interrupt_event = multiprocessing.Event()
self._was_started = False
self._exception_pipe_receiver, self._exception_pipe_sender = (
multiprocessing.Pipe(duplex=False))
def interrupt(self):
"""
Safely interrupts the process.
This raises a KeyboardInterrupt inside the given target.
This call blocks until the interrupt request was successfully delivered.
Further calls to this method after a successful interrupt raise a
`RuntimeError`. Calling this function again after a fail is obviously
possible.
:raises RuntimeError: Raised under following conditions:
- An interrupt is already requested but not
delegated to the target.
- Process was not started.
- Process is not alive any more.
- The interrupt request failed (for unknown
reasons).
"""
# TODO Define more Errors so they can be catched better.
# Suggestion:
# - RuntimeError: For "Process was not started" and "not alive any more"
# - InterruptError: For "Interrupt failed" and "Already delegating"
# Further distinction?
if self._interrupt_event.is_set():
# Catch an already set Event, otherwise we would wait for the
# interrupt exception notification from the Pipe forever.
raise RuntimeError("Already delegating an interrupt.")
if not self._was_started:
raise RuntimeError("Process was not started.")
if not self.is_alive():
raise RuntimeError("Process is not alive any more.")
self._interrupt_event.set()
exception = self._exception_pipe_receiver.recv()
if exception is not None:
raise RuntimeError("Interrupt failed.") from exception
def start(self):
"""
Starts the process.
"""
multiprocessing.Process.start(self)
self._was_started = True
def run(self):
"""
Runs the process target.
"""
worker_thread_joined_event = threading.Event()
def worker(self):
multiprocessing.Process.run(self)
worker_thread_joined_event.set()
worker_thread = threading.Thread(target=worker, args=(self,))
worker_thread.start()
while True:
_wait_for_single(self._interrupt_event, worker_thread_joined_event)
if worker_thread_joined_event.is_set():
if self._interrupt_event.is_set():
# Received interrupt request but thread already finished.
# But don't clear the interrupt event so we can block
# further interrupt requests.
self._exception_pipe_sender.send(None)
break
elif self._interrupt_event.is_set():
try:
InterruptableThread._raise_exception_async(
worker_thread,
KeyboardInterrupt)
self._exception_pipe_sender.send(None)
self._interrupt_event.clear()
break
except BaseException as ex:
self._exception_pipe_sender.send(ex)
# It's good practice to join each thread, even if we can be sure it
# isn't alive any more.
worker_thread.join()
@staticmethod
def _raise_exception_async(thread, exception):
"""
Raises an exception asynchronously in another thread.
For threads inside the `threading` module.
:param thread: The thread where to raise an exception
asynchronously.
:param exception: The exception to raise in thread.
:raises ValueError: Raised when supplying an invalid or not running
thread.
:raises SystemError: Raised when the async-raise failed on the given
thread.
"""
if not thrad.is_alive():
raise ValueError("Thread is not running.")
# It's not possible (as far as I know) to define an atomic section of
# code (whether using the Global Interpreter Lock or other mechanisms).
# We have to hope that Python does not interrupt after the async
# exception request so we can check if everything is alright and revert
# in case. If we have bad luck and the async-exception-set fails, the
# target thread gets executed while it exceeds the count of statements
# for the next exception check. If this happens the exception is not
# reverted, we address more than one thread and everything is going to
# be unstable... It's even not possible with a native C-module.
# TODO: Really even not possible with C?
result = ctypes.pythonapi.PyThreadState_SetAsyncExc(
thread.ident,
ctypes.py_object(exception))
if result == 0:
raise ValueError("Invalid thread ID.")
elif result > 1:
# Something went wrong, we addressed more than one thread with the
# async exception request. Revert.
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("Failed to set an asynchronous exception for "
"given thread. More than one thread was "
"addressed by the request.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment