Skip to content

Instantly share code, notes, and snippets.

@zackw
Last active January 1, 2016 01:06
Show Gist options
  • Save zackw/d9e6afcef851929b0d45 to your computer and use it in GitHub Desktop.
Save zackw/d9e6afcef851929b0d45 to your computer and use it in GitHub Desktop.
subprocess.Popen wrapper that kills descendants on subprocess exit
#! /usr/bin/env python
# jobspawntest.py - experimental subprocess.Popen wrapper that kills
# descendants on subprocess exit. Windows testing needed.
# Copyright 2015 Zack Weinberg <zackw@panix.com>
# Copying and distribution of this file, with or without modification,
# are permitted in any medium without royalty provided the copyright
# notice and this notice are preserved. This file is offered as-is,
# without any warranty.
__all__ = ('Job',)
import os
import subprocess
import signal
if os.name == 'posix':
import errno
def _do_popen(args, kwargs):
if 'preexec_fn' in kwargs:
caller_preexec = kwargs['preexec_fn']
else:
caller_preexec = lambda: None
def child_call_setpgid_and_chain():
os.setpgid(0, 0)
caller_preexec()
kwargs['preexec_fn'] = child_call_setpgid_and_chain
proc = subprocess.Popen(*args, **kwargs)
# The parent process must _also_ call setpgid() to prevent a race.
# See https://www.gnu.org/software/libc/manual/html_node/Launching-Jobs.html
# We may get EACCES here if the child has already called execve();
# in that case it has also already called setpgid() so no worries.
pgid = proc.pid
try:
os.setpgid(pgid, pgid)
except OSError as e:
if e.errno != errno.EACCES:
raise
return (proc, pgid)
def _do_send_signal(job, signal):
os.killpg(job, signal)
def _do_terminate(job):
_do_send_signal(job, signal.SIGTERM)
def _do_kill(job):
_do_send_signal(job, signal.SIGKILL)
elif os.name == 'nt':
import sys
# Nested job objects were added in Windows 8, which identifies
# itself as 6.2 in getwindowsversion().
ver = sys.getwindowsversion()
if ver.major > 6 or (ver.major == 6 and ver.minor >= 2):
_ADD_CREATIONFLAGS = 0x00000004 # CREATE_SUSPENDED
else:
_ADD_CREATIONFLAGS = 0x01000004 # CREATE_SUSPENDED|CREATE_BREAKAWAY
import ctypes
from ctypes.wintypes import HANDLE, LPVOID, UINT, BOOL, DWORD, LONG
def _ec_falsy_winerror(result, *etc):
if not result:
raise ctypes.WinError()
return result
def _ec_m1_winerror(result, *etc):
if result < 0:
raise ctypes.WinError()
return result
_kernel32 = ctypes.WinDLL("kernel32.dll")
_kernel32.CreateJobObjectW.argtypes = (LPVOID, LPVOID)
_kernel32.CreateJobObjectW.restype = HANDLE
_kernel32.CreateJobObjectW.errcheck = _ec_falsy_winerror
_kernel32.TerminateJobObject.argtypes = (HANDLE, UINT)
_kernel32.TerminateJobObject.restype = BOOL
_kernel32.TerminateJobObject.errcheck = _ec_falsy_winerror
_kernel32.AssignProcessToJobObject.argtypes = (HANDLE, HANDLE)
_kernel32.AssignProcessToJobObject.restype = BOOL
_kernel32.AssignProcessToJobObject.errcheck = _ec_falsy_winerror
_kernel32.CloseHandle.argtypes = (HANDLE,)
_kernel32.CloseHandle.restype = BOOL
_kernel32.CloseHandle.errcheck = _ec_falsy_winerror
# defensiveness against handle leakage
class wrap_HANDLE(object):
__slots__ = ('_h',)
def __init__(self, h): self._h = h
def __int__(self): return self._h
def __nonzero__(self): return bool(self._h)
def __del__(self, _CloseHandle=_kernel32.CloseHandle):
if self._h:
_CloseHandle(self._h)
self._h = 0
close = __del__
# subprocess.Popen retains the process handle but not the
# thread handle, which we need to resume the suspended thread.
# The only documented way to recover a thread handle appears
# to be using the "tool help" API, which, fortunately, is in
# kernel32 since XP.
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ThreadID", DWORD),
("th32OwnerProcessID", DWORD),
("tpBasePri", LONG),
("tpDeltaPri", LONG),
("dwFlags", DWORD),
]
LPTHREADENTRY32 = ctypes.POINTER(THREADENTRY32)
_kernel32.CreateToolhelp32Snapshot.argtypes = (DWORD, DWORD)
_kernel32.CreateToolhelp32Snapshot.restype = HANDLE
_kernel32.CreateToolhelp32Snapshot.errcheck = _ec_falsy_winerror
_kernel32.Thread32First.argtypes = (HANDLE, LPTHREADENTRY32)
_kernel32.Thread32First.restype = BOOL
_kernel32.Thread32First.errcheck = _ec_falsy_winerror
_kernel32.Thread32Next.argtypes = (HANDLE, LPTHREADENTRY32)
_kernel32.Thread32Next.restype = BOOL
#_kernel32.Thread32Next cannot fail
_kernel32.OpenThread.argtypes = (DWORD, BOOL, DWORD)
_kernel32.OpenThread.restype = HANDLE
_kernel32.OpenThread.errcheck = _ec_falsy_winerror
_kernel32.ResumeThread.argtypes = (HANDLE,)
_kernel32.ResumeThread.restype = DWORD
_kernel32.ResumeThread.errcheck = _ec_m1_winerror
def _resume_threads(pid):
thblock = THREADENTRY32()
thblock.dwSize = ctypes.sizeof(thblock)
pthblock = ctypes.pointer(thblock)
try:
# TH32CS_SNAPTHREAD (0x4) gives us all threads on the whole
# system, and we have to filter them. There's no way to get
# kernel32 to do that for us.
hsnap = _kernel32.CreateToolhelp32Snapshot(0x4, 0)
_kernel32.Thread32First(hsnap, pthblock)
while True:
if pthblock.th32OwnerProcessID == pid:
try:
hthread = _kernel32.OpenThread(
0x0002, # THREAD_SUSPEND_RESUME
False, pthblock.th32ThreadID)
_kernel32.ResumeThread(hthread)
finally:
_kernel32.CloseHandle(hthread)
if not _kernel32.Thread32Next(hsnap, pthblock):
break
finally:
_kernel32.CloseHandle(hsnap)
def _do_popen(args, kwargs):
job = _kernel32.CreateJobObjectW(None, None)
flags = kwargs.get('creationflags', 0)
flags |= _ADD_CREATIONFLAGS
kwargs['creationflags'] = flags
proc = subprocess.Popen(*args, **kwargs)
_kernel32.AssignProcessToJobObject(job, int(proc._handle))
_resume_threads(proc.pid)
return (proc, wrap_HANDLE(job))
def _do_send_signal(job, sig):
if sig == signal.SIGTERM:
_do_terminate(job)
else:
# There's no way to send CTRL_C_EVENT or CTRL_BREAK_EVENT to an
# entire job, as far as I can tell.
raise ValueError("Unsupported signal: {}".format(sig))
def _do_terminate(job):
try:
hjob = int(job)
if hjob:
_kernel32.TerminateJobObject(hjob, 1)
job.close()
except OSError as e:
# Comments in Windows subprocess.terminate() say that
# "ERROR_ACCESS_DENIED (winerror 5) is received when the
# process already died." MSDN does not document whether
# this is true for job objects, but it seems plausible.
if e.winerror != 5:
raise
# We are not in a position to call GetExitCodeProcess here.
# Just leave it to subprocess.poll().
def _do_kill(job):
_do_terminate(job)
else:
raise ValueError('sorry, not implemented: process groups for ostype "{}"'
.format(os.name))
class Job(object):
"""A Job object wraps a subprocess.Popen object; it is functionally
identical, except that terminate() and kill() are applied to
all child processes _of_ the child process, as well as the
child process itself. Moreover, when the child process exits,
all of its children are killed.
On Unix, this is accomplished with process groups; on Windows,
with job objects. Descendant processes _can_ escape containment;
on Unix, by using setpgid(); on Windows, by being created as
"breakaway" processes.
On Unix, send_signal() is also applied to the process group; on
Windows, this only works for signal.SIGTERM (which is mapped to
terminate()).
"""
def __init__(self, *args, **kwargs):
if len(args) > 1:
raise TypeError("Job() optional arguments must be specified as "
"keyword arguments")
self._proc, self._job = _do_popen(args, kwargs)
def send_signal(self, signal):
return _do_send_signal(self._job, signal)
def terminate(self):
return _do_terminate(self._job)
def kill(self):
return _do_kill(self._job)
def poll(self):
rv = self._proc.poll()
if rv is not None and self._job is not None:
_do_terminate(self._job)
self._job = None
# Forward all other actions to _proc.
def __getattr__(self, aname):
return getattr(self._proc, aname)
if __name__ == '__main__':
import sys
import time
def test_main():
if len(sys.argv) > 1 and sys.argv[1] == 'grandchild':
sys.stdout.write('grandchild: started\n')
time.sleep(2)
sys.stdout.write('grandchild: message should not appear\n')
sys.exit(1)
elif len(sys.argv) > 1 and sys.argv[1] == 'child':
sys.stdout.write('child: started\n')
proc = subprocess.Popen([
sys.executable, __file__, 'grandchild'])
rc = proc.wait()
sys.stdout.write('child: grandchild exit {} (should not appear)\n'
.format(rc))
else:
sys.stdout.write('parent: started\n')
job = Job([
sys.executable, __file__, 'child'])
time.sleep(1)
job.terminate()
rc = job.wait()
sys.stdout.write('parent: child exit {}\n'.format(rc))
test_main()
@zackw
Copy link
Author

zackw commented Dec 31, 2015

If this is working correctly, it will print

parent: started
child: started
grandchild: started
parent: child exit -15

when run as a standalone. The number on the last line might vary.

@vitallium
Copy link

Python 2.7.10, Win10 X64:

Traceback (most recent call last):
  File "jobspawntest.py", line 84, in <module>
    _kernel32.CreateJobObject.argtypes = (LPVOID, LPVOID)
  File "C:\Python27\lib\ctypes\__init__.py", line 378, in __getattr__
    func = self.__getitem__(name)
  File "C:\Python27\lib\ctypes\__init__.py", line 383, in __getitem__
    func = self._FuncPtr((name_or_ordinal, self))
AttributeError: function 'CreateJobObject' not found

@vitallium
Copy link

> python jobspawntest.py
parent: started
Traceback (most recent call last):
  File "jobspawntest.py", line 292, in <module>
    test_main()
  File "jobspawntest.py", line 286, in test_main
    sys.executable, __file__, 'child'])
  File "jobspawntest.py", line 244, in __init__
    self._proc, self._job = _do_popen(args, kwargs)
  File "jobspawntest.py", line 187, in _do_popen
    _kernel32.AssignProcessToJobObject(job, proc._handle)
ctypes.ArgumentError: argument 2: <type 'exceptions.TypeError'>: wrong type

@zackw
Copy link
Author

zackw commented Dec 31, 2015

whack one mole, up comes the next one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment