-
-
Save calladoum-elastic/328068f19e60a76b00f20cdb936cd078 to your computer and use it in GitHub Desktop.
Basic client for the ProcLaunchMon.sys device driver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python -i | |
""" | |
To use: | |
``` | |
$ python -i ProcLaunchMon.py | |
>>> sess | |
``` | |
""" | |
import os | |
import struct | |
import time | |
import threading | |
from ctypes import * | |
IOCTL_CODES = { | |
"Version": 0x22404C, | |
"GetNextEvent": 0x22403C, | |
"RemovePid": 0x228038, | |
"AddPid": 0x228034, | |
"DuplicateHandle": 0x224048, | |
"ResumeProcess": 0x224040, | |
"Shutdown": 0x224044, | |
"GetSessionId": 0x224050, | |
} | |
DEBUG = True | |
def u32(x): return struct.unpack("<I", x[0:4])[0] | |
def u64(x): return struct.unpack("<Q", x[0:8])[0] | |
def p32(x): return struct.pack("<I", x) | |
def p64(x): return struct.pack("<Q", x) | |
def ok(x): print(f"[+] {x}") | |
def dbg(x): | |
if DEBUG: | |
windll.kernel32.OutputDebugStringW(f"[+] {x}{os.linesep}") | |
class const: | |
GENERIC_READ = 0x80000000 | |
GENERIC_WRITE = 0x40000000 | |
OPEN_EXISTING = 3 | |
DEVICE_PATH = r'''\\.\com_microsoft_idna_ProcLaunchMon''' | |
def hexdump(source: bytes, length=0x10, base=0): | |
result = [] | |
for i in range(0, len(source), length): | |
chunk = bytearray(source[i:i + length]) | |
hexa = " ".join(["%.02x" % c for c in chunk]) | |
text = "".join([chr(b) if 0x20 <= b < 0x7F else "." for b in chunk]) | |
result.append("{addr:#018x} {data:<{dw}} {text}".format(addr=base+i,dw=3*length,data=hexa,text=text)) | |
return os.linesep.join(result) | |
class DriverSession: | |
def __init__(self): | |
DesiredAccess = const.GENERIC_READ | const.GENERIC_WRITE | |
self.hDevice = windll.kernel32.CreateFileW(const.DEVICE_PATH, DesiredAccess, 0, None, const.OPEN_EXISTING, 0, None) | |
if self.hDevice == -1: | |
raise IOError(f"""Cannot get handle to "{const.DEVICE_PATH}" """) | |
self.Version() | |
self.GetSessionId() | |
self.StartEventThread() | |
def __thread_routine(self): | |
while self.keep_running: | |
self.GetNextEvent() | |
time.sleep(1) | |
self.DuplicateHandle() | |
def StartEventThread(self): | |
self.keep_running = True | |
self.thread = threading.Thread(target=self.__thread_routine) | |
self.thread.daemon = True | |
self.thread.start() | |
def __del__(self): | |
ok(f"Stopping thread...") | |
self.keep_running = False | |
self.thread.join() | |
ok(f"Closing device handle...") | |
windll.kernel32.CloseHandle(self.hDevice) | |
def __repr__(self) -> str: | |
hDevice = self.hDevice | |
return f"DriverSession({hDevice=})" | |
def DeviceIoctlControl(self, IoctlCode: int, _in:bytes=b'', _out:bytes=b'', *args, **kwargs) -> tuple[bool, bytes]: | |
dwBytesReturned = c_uint32() | |
InputBufferSize = kwargs.get('_inlen', len(_in)) | |
OutputBufferSize = kwargs.get('_outlen', len(_out)) | |
InputBuffer = create_string_buffer(InputBufferSize) | |
OutputBuffer = create_string_buffer(OutputBufferSize) | |
InputBuffer.value = _in | |
OutputBuffer.value = _out | |
isSuccess = windll.kernel32.DeviceIoControl( | |
self.hDevice, IoctlCode, InputBuffer, InputBufferSize, | |
OutputBuffer, OutputBufferSize, byref(dwBytesReturned), | |
None | |
) | |
if isSuccess: | |
dbg(f'DeviceIoControl({IoctlCode=:#x}) = True, {dwBytesReturned=}') | |
if dwBytesReturned: | |
dbg(hexdump(OutputBuffer.raw)) | |
return isSuccess, bytearray(OutputBuffer) | |
return isSuccess, b'' | |
gle = GetLastError() | |
if gle != 0x32: | |
reason = FormatError(GetLastError()) | |
dbg(f"DeviceIoControl({IoctlCode=:#x}) = False, {gle=:#x}, {reason=}") | |
return isSuccess, b'' | |
def Trigger(self, ioctl, input, output): | |
return self.DeviceIoctlControl(ioctl, bytes(input), bytes(output)) | |
def GetNextEvent(self): | |
isSuccess, out = self.Trigger(IOCTL_CODES["GetNextEvent"], b'', bytearray([0x00]*0x10)) | |
if isSuccess: | |
ppid, pid, unk1, unk2 = struct.unpack("<IIII", out) | |
dbg(f"{pid=:d} {ppid=:d} {unk1=:#x} {unk2=:#x}") | |
return isSuccess | |
def ResumeProcess(self): | |
return self.Trigger(IOCTL_CODES["ResumeProcess"], bytearray([0x00]*0x4), b'') | |
def Shutdown(self): | |
return self.Trigger(IOCTL_CODES["Shutdown"], b'', b'') | |
def DuplicateHandle(self) -> bool: | |
isSuccess, out = self.Trigger(IOCTL_CODES["DuplicateHandle"], b'', bytearray([0x00]*0x8)) | |
if out: | |
handle = u64(out) | |
dbg(f"{handle=:#x}") | |
return isSuccess | |
def Version(self): | |
return self.Trigger(IOCTL_CODES["Version"], b'', bytearray([0x00]*4)) | |
def GetSessionId(self): | |
return self.Trigger(IOCTL_CODES["GetSessionId"], b'', bytearray([0x00]*0x10)) | |
def AddPid(self, pid: int): | |
return self.Trigger(IOCTL_CODES["AddPid"], p32(pid), b'') | |
def RemovePid(self, pid: int): | |
return self.Trigger(IOCTL_CODES["RemovePid"], p32(pid), bytearray([0x00]*0x4)) | |
sess = DriverSession() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment