Skip to content

Instantly share code, notes, and snippets.

@Makman2
Created August 3, 2015 02:01
Show Gist options
  • Save Makman2/30452063a34cfa1512e1 to your computer and use it in GitHub Desktop.
Save Makman2/30452063a34cfa1512e1 to your computer and use it in GitHub Desktop.
from multiprocessing import Process, Queue
from time import sleep
import os
import ctypes
import sys
import platform
# WINDOWS ONLY!!!
class PROCESSENTRY32:
_MAX_PATH = 0x104
def __init__(self):
# Change the struct size depending on architecture because the Windows
# API PROCESSENTRY32 structure contains a ULONG_PTR type that is
# 8-bytes long in x64 and 4-bytes long in x86 systems/executables.
arch = platform.architecture()[0]
if arch == "32bit":
dwSize = 296
self._offsets = (0, 4, 8, 12, 16, 20, 24, 28, 32, 36)
self._get_ULONG_PTR = self._get_LONG
self._set_ULONG_PTR = self._set_LONG
elif arch == "64bit":
dwSize = 304
# The size seems to stretch to 8-bytes too, maybe because of the
# x64 architecture. This is not documented correctly in the Windows
# API where dwSize is a DWORD.
self._offsets = (0, 8, 12, 20, 24, 28, 32, 36, 40, 44)
self._get_ULONG_PTR = functools.partial(self._get_integer,
length=8,
signed=True)
self._set_ULONG_PTR = functools.partial(self._set_integer,
length=8,
signed=True)
else:
raise TypeError("Unsupported system architecture.")
self._as_parameter_ = ctypes.create_string_buffer(dwSize)
# Set the dwSize parameter for this struct automatically.
self.dwSize = dwSize
def _get_integer(self, pos, length, signed):
return int.from_bytes(self._as_parameter_[pos:pos + length],
sys.byteorder,
signed=signed)
def _set_integer(self, value, pos, length, signed):
self._as_parameter_[pos:pos + length] = (
value.to_bytes(length, sys.byteorder, signed=signed))
def _get_DWORD(self, pos):
return self._get_integer(pos, 4, False)
def _set_DWORD(self, pos, value):
self._set_integer(value, pos, 4, False)
def _get_LONG(self, pos):
return self._get_integer(pos, 4, True)
def _set_LONG(self, pos, value):
return self._set_integer(value, pos, 4, True)
def _get_TCHAR_array(self, start, stop):
complete_string = self._as_parameter_[start:stop].decode()
return complete_string[0:complete_string.find("\x00")]
@property
def dwSize(self):
return self._get_DWORD(self._offsets[0])
@dwSize.setter
def dwSize(self, value):
self._set_DWORD(self._offsets[0], value)
@property
def cntUsage(self):
return self._get_DWORD(self._offsets[1])
@cntUsage.setter
def cntUsage(self, value):
self._set_DWORD(self._offsets[1], value)
@property
def th32ProcessID(self):
return self._get_DWORD(self._offsets[2])
@th32ProcessID.setter
def th32ProcessID(self, value):
self._set_DWORD(self._offsets[2], value)
@property
def th32DefaultHeapID(self):
return self._get_ULONG_PTR(self._offsets[3])
@th32DefaultHeapID.setter
def th32DefaultHeapID(self, value):
return self._set_ULONG_PTR(self._offsets[3], value)
@property
def th32ModuleID(self):
return self._get_DWORD(self._offsets[4])
@th32ModuleID.setter
def th32ModuleID(self, value):
self._set_DWORD(self._offsets[4], value)
@property
def cntThreads(self):
return self._get_DWORD(self._offsets[5])
@cntThreads.setter
def cntThreads(self, value):
self._set_DWORD(self._offsets[5], value)
@property
def th32ParentProcessID(self):
return self._get_DWORD(self._offsets[6])
@property
def pcPriClassBase(self):
return self._get_LONG(self._offsets[7])
@property
def dwFlags(self):
return self._get_DWORD(self._offsets[8])
@property
def szExeFile(self):
base = self._offsets[9]
complete_string = (
self._as_parameter_[self._offsets[9]:
self._offsets[9] + self._MAX_PATH]
.decode())
found = complete_string.find("\x00")
if found == -1:
return complete_string
else:
return complete_string[0:found]
@szExeFile.setter
def szExeFile(self, value):
data = value.encode()
if len(value) > self._MAX_PATH:
raise OverflowException("value bigger than MAX_PATH (" +
hex(self._MAX_PATH) + ").")
# Fill up with zeros.
data += bytes(self._MAX_PATH - len(value))
self._as_attribute_[self._offsets[9]:
self._offsets[9] + self._MAX_PATH] = data
def collect_first_process(pid):
entry = PROCESSENTRY32()
TH32CS_SNAPPROCESS = 0x2
snapshot_handle = ctypes.windll.kernel32.CreateToolhelp32Snapshot(
TH32CS_SNAPPROCESS,
0) # last param is ignored due to TH32CS_SNAPPROCESS
print(snapshot_handle)
ctypes.windll.kernel32.Process32First(snapshot_handle, entry)
ctypes.windll.kernel32.CloseHandle(snapshot_handle)
return entry
def interrupt_process(pid):
CTRL_C_EVENT = 0x0
try:
# Check group id's.
TH32CS_SNAPPROCESS = 0x2
snapshot_handle = ctypes.windll.kernel32.CreateToolhelp32Snapshot(
TH32CS_SNAPPROCESS,
0) # last param is ignored due to TH32CS_SNAPPROCESS
entry = PROCESSENTRY32()
Process32First(snapshot_handle, entry)
# Check if process ID matches current pid (os.getpid()) or the given
# one. If match, retrieve the parent ID. At the end check whether both
# do match, and if they match, interrupt_process needs to wait for
# KeyboardInterrupt since pid and current process share the same
# process group. If not, don't wait for KeyboardInterrupt.
# TODO: Implement Process32Next() to search through all entries.
# TODO: Close handle via CloseHandle()
ctypes.windll.kernel32.GenerateConsoleCtrlEvent(CTRL_C_EVENT, pid)
# DON'T USE CODE BELOW!!! THIS WON'T WORK AND GENERATES A COMPLETELY
# DIFFERENT SIGNAL NOT RECOGNIZED FROM PYTHON AS KEYBOARDINTERRUPT!!!
#os.kill(pid, signal.CTRL_C_EVENT)
# Wait for KeyboardInterrupt so we can be sure it was raised also in the
# subprocess.
while True:
sleep(60)
except KeyboardInterrupt:
# KeyboardInterrupt successfully raised in process.
pass
def compute_fibonacci(step, q):
try:
a = 1
b = 1
for i in range(step):
print("COMPUTING...")
c = a
a += b
b = c
sleep(0.5)
print("RESULT STORED")
q.put(a)
except:
print("### ABORTED ###")
finally:
print("### FINALLY EXECUTED ###")
if __name__ == "__main__":
print("INITIALIZING")
q = Queue()
thread = Process(target=compute_fibonacci, args=(10, q))
print("STARTING...")
thread.start()
print("STARTED")
# Interrupt the process.
sleep(2)
print("MAIN PID")
print(os.getpid())
print("THREAD PID")
print(thread.pid)
interrupt_process(thread.pid)
thread.join()
print("COMPUTATION SUCCESSFUL, RESULT:")
# Important: Don't fetch the last entry of a queue if you interrupted
# its process since it corrupts the queue and blocks permanently (even
# with a specified timeout).
#print(q.get())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment